Down from Eight Miles High… and Landing!

I’ve landed at Neurensic!  First I’d like to thank all the people entertained my request for a job – there were lots and lots.  Competition was stiff, and as the month went on, the offers got wilder and wilder and closer and closer to my dreams.  Ultimately I had the choice for taking VC funding and starting something of my own (still Out There, and will probably happen yet, and could be Flying Cars), and venturing into the financial tech sector, something I’ve long thought about. Fin Tech won the day, but I’d like to give a special shout out to Google (at least 4 different groups courted me, I really like the feel of Google).

The job hunt experience gave me an unparalleled view into the Valley’s outstanding tech problems.  Several problems really stood out – as things a lot of people wanted help solving, and I thought I’d mention them here as a sort of “State of the Valley’s Better Mousetrap Hunters”.

NodeJS – 3 different companies made clear offers to help make a server-side version of JavaScript targeting NodeJS.  This is right up my alley: adding threading (but keeping the single-threaded coding style), adding a low latency high performance GC, and adding A Better JIT.  Looking at the state of JS JIT’ing I cant help but think there’s a lot of performance being left on the table – especially for “long” running server apps.  Maybe some easy tweaks and V8 cleans up here… but so far the NodeJS audience ain’t feeling the love.

Python – At least 6 stalled attempts to speed up Python are Out There (PyPy appears to be “less stalled”) – and I got clear offers from 2 companies and sorta clear offers from 2 more to make “Python Go Fast”.  I also reached out to the Python core community – and got a resounding Silence.  I’m not fond of pushing rocks uphill; if the core community isn’t interested then I’m not (yet) willing to tilt at this windmill.  Nonetheless, given the response from 4 companies (some quite large) I’m willing to bet Somebody’s interested.  Sorta needs the Kickstarter-from-Hell to pay my bills, but then could be a fun gig.

Both the NodeJS and Python speed problems appear to not be getting solved anytime soon.  I might come revisit both of these areas down the road- perhaps more as a quiet easy-going slam-dunk job.

After that, mix in 4 different ML-related offers, 8 different startups (literally at least 8, more than one network-security-meets-ML), and several We’ll-Fund-Whatever-Cliff-Does startups.  And flying cars, no lie; that was the last-ditch almost winner.  But I need to be in a better state financially before I can bite into that apple.  All told I sorted through over 20 serious job offers. It’s good to have First World Problems.

Why Neurensic?  Why Financial Tech?

This story goes waaaay back – I’ll skip past the “I was born a poor black child…” part of the story and start in when I was about 26 or 27.  I was just finishing up my PhD and was starting to look at the job market – and Everybody was saying all the money was in being a stock market trader.  It looked too much like giving up on helping the world – just living for myself, so mercenary and narcissistic.  I still wanted to Save the World, and I liked my mix of theory and practice… so I ran off to do compiler research at HP Labs, then beat IBM on IBM hardware with my mad compiler skilz, and a few years later headed out to California to make this new-fangled Java language Go Fast.

Fast forward 20 years, and I got fame… but not really fortune.  I am all for changing that now, at least a bit, and for Save the World.  I have been courted repeatedly by the traders over the years, and certainly talked with dozens of high frequency trader groups about Java performance and low latency GC.  Basically I’ve had the Fin Tech itch for more than 20 years, and I’m finally going to scratch it – but not in trading directly, but in making trading safe.

Neurensic has some lucky timing and some serious financial market Quant talent. They put together a Machine Learning-based product to find financial market cheaters – and just in time to match the new Dodd-Frank compliance rules.  Their timing is impeccable; suddenly everybody who is responsible for traders’ good behavior (i.e. the Banks) wants their stuff.  Their first generation product is just coming out, and I plan on bring my own skill and H2O’s Big Data and ML abilities to dramatically up Neurensic’s game.  The Goal: keeping the Stock Market (well, all financial markets) safe for everybody.

Neurensic Round Two is also right up my alley: a Rosetta Stone for the 40 or 50 back-end systems found in any large Bank.  Many of these systems date from the 80’s and 90’s, and use a level of tech I’m very comfortable with.  The Goal here is to simply bring together all of this disparate data – it’s Big Data, so my Big Data experience applies here also – and give decision makers a grand overview of their entire enterprise, updated in seconds instead of days.  The term that springs to mind is a colonoscopy, a birds-eye view of a Banks’ backside.  In the longer term, we aim to change the fabric of the financial system as a whole, at the transactional layer.

I will remain in Silicon Valley, and will be shortly recruiting engineers looking to blend machine learning, financial tech, and some “catch the bad guys” mentality.

But Wait, There’s More!

I have a another offer I am actively pursuing – to keep extending H2O in the Open Source.  This will be a funded H2O-Center-of-Excellence with room for 5 to 10 salaried engineers solely focused on moving H2O forward.  Funding hasn’t landed yet, so I can’t say more – but I’m hopeful that I can start offering positions here in the next month or so.  Note that Neurensic is both fully on-board with this effort, and is not funding it nor controlling it in any way. Basically, it will be my second day job, and it happens to align well with my other day job.

Related-but-not-related, I’ve really enjoyed my engagement with the Skymind guys (hi Chris and Adam!) and DeepLearning4J – and am also angling to be in a position to get DL4J and H2O working together.

Hey!  This means I’m running at least 2 teams of of 5 to 10 engineers each!  Isn’t Cliff known for both his code hacking skilz and lack-of-people skilz?  Well, I’ve been running the engineering team at H2O for several years – and I really like it, and by all accounts people like how I run a team.  So yeah, I’ll be running some teams.  I’ll also be coding – I am a Builder after all, I have to Make Stuff to save my soul – so yeah, I’ll be coding still.  Just blending in some people-time also.

Oh yeah, AND I’m teaching a course at Stanford on Machine Learning. Stanford CME250A

If any of this interests you, come knock on my door!  For anything open source, or H2O, or heck Java Performance or compilers or flying cars, my email of 30 years still works:

For anything related to Neurensic or Fin Tech try me at:

Looking forward to hearing from y’all!!!



Words of Parting, a Fond Farewell

I resigned from H2O last week – with a fair parting and I fully support H2O’s success in future.  Srisatish and I worked together since 2004, and I learned a tremendous amount from him.  He has an amazing passion and vision.

I’d like to take this opportunity to celebrate our work together, in building something from nothing.  As Srisatish recalls, we did a million little things together; I also fondly remember the uHaul trip retrieving our earliest furniture (where we forgot to close the rear door and got a lot of other drivers’ wide-eyed panic looks before figuring it out!), and cranking out code furiously – and Taco Bell as the “company cafeteria”.  And of course the many many team-building lunches eating in the warm California sun at Fiesta Del Mar.

A lot of how we built the company was playing off each others strengths.  Srisatish had a precognition of a Machine Learning wave sweeping the industry, and with his passion he sold the vision to me, the investors, and then the industry.  For my part, I built the backbone of that vision: not just the foundation, but the powerhouse engineering team which built up from that foundation to produce the milestone Machine Learning product that is H2O.  That partnership produced amazing stuff, and I’m sorry it had to come to an end.  I only wish him the very best in the future.

It’s all of you, the H2O team, I’ll miss the most…. over the past four years, you have built an amazing technology – who’s strength is only just being tapped.  I have witnessed huge personal growth in all of you, as coders, as engineers and as good people, and am very happy to have had a chance to be a part of that growth. Your creativity, hard work and dedication have built something wondrous.  Your passion has become your outreach, you are leading the world into an era of ML and AI – a better way to do things, build things, change things, faster, more intelligent, and yet more human as well.  I applaud all that you have done, and continue to do – and I wish you the very best in your personal journey to make the world a better place.



Winds of Change

I’ve left H2O.  I wish them all the best.  I’ve left a longer farewell here.

I’m a loose cannon, at least for a few months, and am looking for (the right kind of) trouble.

So I’m here, posting on my blog again, to see if I can get some suggestions on what to do with my life.  🙂

Here are a few of my crazy Cliff ideas I’m sorta chasing around:

  • Python Go Fast: Do Unto Python as Thou hast Done Unto Java. I hack the guts of Python; add a high power JIT, a full-fledged low-pause GC, true multi-threading support, i.e. make Python as fast and as parallel as Java (about the same speed as C).  This blog is really a request for an open discussion on this topic.  Is the Python community interested?  How does this get funded?  (uber Kickstarter?)  I’ll only go here with the full support of the core Python committers, and general “feel goods” from the general python community – and I’m hoping to start a discussion.  At this point I’m a premier language implementer, and making Python Go Fast is well within my abilities and past experiences. Take about 2 years & $2M for this effort to be self-sustaining (build all the core new tech and hand it off to other contributors).
  • H2O2: I left a lot of unfinished technical work at H2O – and H2O has plenty of technical growing room.  I could continue to contribute to the Open Source side of H2O, with some Big Company footing the dev bill.  Big Company gets kudos for supporting Open Source, H2O gets the next generation of cool new features.
    • Plan B, Big Company funds some new core closed-source innovations to H2O and monetizes that.  H2O still gets some Open Source improvements but not all core tech work is open.
  • Teach: I bow out of the Great Rat Race for a year and go teach Virtual Machines at Stanford or Berkley.  Fun, and makes for a nice sabbatical.  (as a bonus, I’ll probably have 3 kids in college next year, and the whole Stanford Pays Faculty Kids’ College thing sounds really good).  Might could do this while hacking Python guts at the same time.
  • Jetsons: I know how to Do the Flying Car Thing Right.  Million people, million flying cars in the air, all at once, and nobody can crash anything.  Feels like you’re flying but the Autopilot-o-Doom holds it all together.  Got figured out how to handle bad weather, ground infrastructure lossage (e.g. the Big Quake wipes out ground support in 10sec, how do you land 1million cars all at once?), integration into existing transportation fabric, your driveway as a runway, playing nice with the big jets, etc.  Bold statement, needs bold proof.  Lots more stuff here, been thinking on this one for a decade.  Needs 3 or 4 years to put together the eye-popping this-might-actually-work prototype.  Thus needs bigger funding; probably in the $10M range to get serious.
  • Something Random: by which I mean pretty much anything else that’ll pay the bills and is fun to do.  I got some pretty darned broad skillz and interests…

Gonna be a long-needed Summer-o-Crazy-Fun for me.  Heck, maybe I’ll post my 2nd tweet (shudder).  🙂



How does Java Both Optimize Hot Loops and Allow Debugging

This blog came about because an old friend is trying to figure out how Java can do aggressive loop and inlining optimizations, while allowing the loading of new code and setting breakpoints… in that optimized code.

On 2/21/2015 11:04 AM, IG wrote:

Safepoint. I’m still confused because I don’t understand what is the required state at a safepoint that can’t be ensured at any random point of execution.  What is in-flight that must be permitted to complete? Given a cycle-by-cycle map you can find all the pointers at *any* cycle – is safepointing just a way to reduce the volume of pointer-locating info?

Reducing OOP-map volume happens, is useful – but is not the main thing being achieved.

It’s flipping backwards from the machine state to the language architected state, unwinding from super-aggressive optimization to reach the theoritical Java (or C/C++) language state at some points.  Toooo expensive to preserve that mapping line-by-line, call-by-call, execution-point-by-execution-point. So you keep the mapping at a few places, and optimize the bejeebers out of things in-between.  If somebody wants to:

  • set a breakpoint in optimized code (yes, works for Java all the time)
  • inject new classes, new targets for some virtual-call

then you need to stop the physical machine (actual CPU/Mill) at some place where you can find all the language architected pieces.  This mapping is far more complex than the oop/no-oop mapping for GC… but isn’t why you make it rare.  You make it rare because you have to hang onto all the language state at these safepoints, but don’t need it between safepoints.

An example might help:

  for( int i=0; i<N; i++ )
    sum += A[i]  // set a breakpoint here, for "i>1e6" or maybe "i>1e7"

So intent is to run awhile… then somebody sets a breakpoint… and wants to inspect “sum”, or change “i” in the debugger, then continue execution.  What’s the machine code look like, with no safepoints?  Something like this (bogus X86-like assembly code, pardon non-perfection):

 ... some setup, zero-trip test, range-check on "N" vs "A.length", etc
 ld  RA=&A        // Classic strength-reduced pointer into array A
 add RC,RA,#N*8   // Stopping value of A
 ld  RB=0         // sum
 ld  RD=[RA+off]  // Load from A
 add RB=RB,RD     // Sum into RB
 add RA=RA,8      // Advance our pointer into A
 cmp RA,RC        // Stop when at the end
 blt loop
 // RB contains sum


But actually, with some loop unrolling:

... some setup, zero-trip test, range-check on "N" vs "A.length", etc
 ld  RA=&A        // Classic strength-reduced pointer into array A
 add RC,RA,#N*8 & (-1-3)  // mask of low 2 bits of iteration - unroll by 4
 ld  RB=0 // sum
 prefetch [R1+off+32]  // Fetch next cache line... overlap memory & compute
 ld  R0=[RA+off+ 0]    // Get 4 values array A
 ld  R1=[RA+off+ 8]
 ld  R2=[RA+off+16]
 ld  R3=[RA+off+24]
 add R4=R0,R1  // add-tree to allow more parallel adders
 add R5=R2,R3  // add-tree to allow more parallel adders
 add RB=RB,R4  // partial sum into RB
 add RB=RB,R5  // full sum into RB
 add RA=RA,8*4 // Skip over 4 elements of array A
 cmp RA,RC     // Stop when at the end
 blt loop
 ... cleanup code for 0-3 more iterations
 // RB contains sum


Now I want to break on the “sum += A[i]” line… where is that, exactly, in that unrolled loop?

And which register contains “i”?  (Hint: none of them).

Do I even want to keep a shadow copy of “i” in parallel with my “A” pointer I’m rolling through the loop?  It’s pure overhead, unless I breakpoint and want to have some place to read “i”…. but even then I cannot *change* this shadow “i” because it won’t change my “A” pointer.

What about changing N?  Do I have enough info in the register mapping to describe all possible kinds of updates I might want to do to register RC when I tweak N?  This example is simple, but optimizing compilers can do hella complex code changes.

How does a Safepoint help here?  Well, for starters it points out that I do indeed need some way to find “i”, and if I change it then propagate those changes back into the loop.  Lets look at this code with a Safepoint in it.

... some setup, zero-trip test, range-check on "N" vs "A.length", etc
 ld  RA=&A        // Classic strength-reduced pointer into array A
 add RC,RA,#N*8 & (-1-3)  // mask of low 2 bits of iteration - unroll by 4
 ld  RB=0         // sum
 ld  RI=0         // Concession to Safepoint: keep "i" around
 prefetch [R1+off+32]  // Fetch next cache line... overlap memory & compute
 ld  R0=[RA+off+ 0]    // Get 4 values array A
 ld  R1=[RA+off+ 8]
 ld  R2=[RA+off+16]
 ld  R3=[RA+off+24]
 add R4=R0,R1  // add-tree to allow more parallel adders
 add R5=R2,R3  // add-tree to allow more parallel adders
 add RB=RB,R4  // partial sum into RB
 add RB=RB,R5  // full sum into RB
 add RA=RA,8*4 // Skip over 4 elements of array A
 add RI=RI,4   // Consession to Safepoint: keep "i" around
 SAFEPOINT: RI contains "i", RB contains "sum", "some place in the stack" contains "N"
 cmp RA,RC     // Stop when at the end
 blt loop
 ... cleanup code for 0-3 more iterations
 // RB contains sum


So I added 1 op in my hot loop to compute “i” on the fly.  Not too expensive; 1 clock per cache-line of data.  In general we end up keeping alive what is otherwise programmatically dead “just in case” the user stops the running code and wants to inspect (dead) variables – but this is cheap.  Just spill ’em off to stack somewhere and flag ’em in the Safepoints variable map.

How do we trigger the Safepoint?  Easiest way is to have the safepoint code test some thread-local bit for a “stop” bit, and if found… stop.  If we have to stop we might have a lot of cleanup to do, but stopping is rare so the cost (of mostly not stopping) is low.  There’s lots of ways to make the safepoint check cheap.  Here’s one that depends on keeping a 2Meg (TLB-sized) window above the stack pointer mapped in or out – perfect for an X86 TLB:

  ld RX,[RSP+2M] // will trap if we map this page out, then catch the SEGV & handle


How do we use the Safepoint?  Once we’ve stopped the running thread mid-loop, we expect the trap handler to have saved off all the registers.  Later we can inspect the register map to find “i” or “sum”.  Note that a Java-level query cannot ask for our “RA” register – as it’s the sum of “&RA” and “i*8” and isn’t visible at the language level – so “RA” does not appear in the Safepoint’s map.

How do we change “i” or “N” and have it All Just Work?  Basically, we re-enter a clone of the loop from above…. a clone specially made to be re-entered at any iteration.  The loop setup code in the clone will rebuild “RA” and “RB” and any other intermediate state as needed, before falling into essentially the same code as the normal loop (and indeed they might be shared here!).  Or in other words – since the optimizer did something complex to build “RA” from “&A” and “i” – have the optimized do it again for any arbitrary new “i”.

The general plan is simple (from the optimizing compiler’s point-of-view): Safepoints are treated like a function-call only taken on a very very rare execution path.  Safepoints take as inputs all visible language-level state, including the state of all inlined function calls, and generally are assumed to update all memory state but not any local vars (if you are also hacking local variable state via the debugger interface, then we need to go a step further and deopt/reopt – leading to the loop clone mentioned above).

With this magical “low frequency function call” in hand, the optimizer attempts the best optimizations it can, subject to the normal constraints that the call arguments are available somehow.

Hope this helps,


Hacking in H2O

In preparation for H2OWorld – I written a couple of blogs to help people quick-start in hacking in H2O.  All of these talks blend engineering, math and Big Data – and need nothing more than a really basic (free) Java development environment.



Sparkling Water: H2O + Scala + Spark

Spark is an up and coming new big data technology; it’s a whole lot faster and easier than existing Hadoop-based solutions. H2O does state-of-the-art Machine Learning algorithms over Big Data – and does them Fast. We are happy to announce that H2O now has a basic integration with Spark – Sparkling Water!

This is a “deep” integration – H2O can convert Spark RDDs to H2O DataFrames and vice-versa directly. The conversion is fully in-memory and in-process, and distributed and parallel as well. This also means H2O has a (nearly) full integration with Scala as well – H2O DataFrames are full Scala Collection objects – and the basic for each collection calls naturally run distributed and parallel also.

A few months back we announced the start of this initiative: Sparkling Water = H2O + Spark

In that post we went for the quickest integration solution possible: separate processes for Spark and H2O, with a Tachyon bridge. While this worked, it required fully 3 copies of all data: a Spark copy, a Tachyon bridge copy, and an H2O copy – and the data passed through process boundaries repeatedly. With this work we need only the Spark and H2O copies, the data is only copied once,and does not cross a process boundary. It’s both faster and uses less memory.

Scala integration is being showcased before Spark integration simply because we need to understand H2O’s Scala integration before we can understand H2O’s Spark integration.

H2O’s Scala API

As always, the code is open source and available on github. This code is all in 0xdata’s “h2o-dev” repro – a clean-slate dev friendly version of H2O. As of this writing, h2o-dev has all the core h2o logic, and about 50% of the machine learning algos (and the rest should appear in a month).

The basic Scala wrappers are here:

And some simple examples here:

The idea is really simple: a Scala DataFrame extends an H2O water.fvec.Frame object.

class DataFrame private ( key : Key, names : Array[String], vecs : Array[Vec] ) extends Frame(key,names,vecs) with Map[Long,Array[Option[Any]]] {

The Scala type is a Map[Long, Array[Option[Any]]] – a map from Long row numbers to rows of data – i.e. a single data observation. The observation is presented as an Array of Option[Any] – array elements are features in the observation. H2O fully supports the notion of missing elements or data – and this is presented as a Scala Option type. Once you know a value is not“missing” – what exactly is it’s type? H2O Vecs (columns) are typed to hold one of these types:- a Number (Java double conceptually, but any of boolean, byte, char, int, float, long, double)– a String– a Factor/Enum (similar to interned Strings that are mapped to small dense integer values)– a Timestamp (stored internally as milliseconds since the Unix Epoch)– a UUID

A DataFrame can be made from a Spark RDD or an H2O Frame directly or from a CSV file:

val df1 = new DataFrame(new File("some.csv"))

Column subsets (a smaller dataframe) can be selected via the column name:

val df2 : DataFrame = df2('a_column_header,'age,'id)

You can do a basic foreach:

df2.foreach( case(x,age,id) => ...use x and age and id... )

And this will run distributed and parallel across your cluster!

The code for foreach is very simple:

override def foreach[U](f: ((Long, T)) => U): Unit = {
  new MRTask {
    override def map( chks : Array[Chunk] ) = {
      val start = chks(0)._start
      val row = new T(chks.length)
      val len = chks(0).len
      (0 until len).foreach{ i =>
        (0 until chks.length).foreach{ col => row(col) = if( chks(col).isNA0(i) ) None else Some(chks(col).at0(i)) }

More complicated examples showing e.g. a scala map/reduce paradigm will follow later.

WaterWorks – Flowing Data between Spark and H2O

Let’s turn to Spark now, and show H2O and Spark working together. Spark RDD’s and H2O Frames are similar – but not equal – ways to manage large distributed data. Basically RDDs are a collection of blocks (Partitions) of rows, and Frames are a collection of columns, aligned in rows and broken up in Chunks.Sort of a blocks-vs-stripes thing.

H2O represents data in columns (Vecs in H2O lingo) vertically striped across the cluster; a Frame is a collection of columns/Vecs. Vecs, in turn,are broken up into Chunks – and the Chunks are carefully aligned in the JVM heaps such that whole rows of data (all the columns in a Frame) align.

Spark represents data in RDDs (Resilent Distributed Dataset) -a Collection of elements. Each RDD in broken up in turn into Partitions blocked across the cluster; each Partition holds a subset of the Collection as complete rows (or observations in data-science parlance).

H2O Frames, Vecs and Chunks can represent any primitive Numeric type, Strings, timestamps, and UUIDs – and support the notion of a missing value. RDDs ca nrepresent a collection of any Scala or Java object. If we limit the objects to containing fields that H2O represents then we can build a simple mapping between the two formats.

For our example we’ll look at a collection of Users:

class User(
  id: Long,
  age: Option[Short],
  sex: Option[Boolean],
  salary: Int

In Scala, this is typed as RDD[User], and we’ll have 1 User object per real User, blocked in Partitions around the cluster. In H2O, DataFrames naturally hold all numeric types and missing values in a column format; we’ll have 4columns each holding a value for each user again distributed in Chunks around the cluster. Here’s a single Partition hold 4 users, and showing 4 Chunks (one per column):


Here’s 4 JVM heaps holding all 32 users. There are 8 partitions (two per JVM)and 32 Chunks in 4 Vecs striped around the cluster:


An Example: Back and Forth between H2O and Spark

Here’s a simple Scala example; we’ll assume we have the 4-node Sparkling Water cluster above and a CSV (or Hive) file on disk. We’ll use a simple User data file featuring just a unique id, the user’s age, sex and salary. From just the age and sex we’ll try to predict the typical salary using DeepLearning.

First we load the CSV file into H2O. H2O features a fast robust CSV parser which is able to ingest CSV data at full disk bandwidths across a cluster. We can handle a wide variety of CSV formats (e.g. Hive is a CSV with ^A/0x01 field separators) and typically automatically infer all column types. Unlike Spark,operations in H2O are eager not lazy, so the file is loaded and parsed immediately:

// Load an H2O DataFrame from CSV file
val frameFromCSV = new DataFrame(new File("h2o-examples/smalldata/small_users.csv"))

Then we’ll convert to a Spark RDD – like all RDD operations, this conversion islazy; the conversion is defined but not executed yet:

val sc = createSparkContext()
val table : RDD[User] = H2OContext.toRDD(cs,frameFromCSV)

And then run an SQL query! SQL is a great way to subselect and munge data, and is one of Spark’s many strengths.

table.registerTempTable("user_table") // The RDD is now an SQL table
val query = "SELECT * FROM user_table WHERE AGE>=18" // Ignore underage users
val result = sql(query) // Run SQL query

Then we’ll convert back to an H2O DataFrame – and run H2O’s Deep Learning(NeuralNets) on the result. Since H2O operations are eager, this conversion triggers the actual SQL query to run just before converting.

// Convert back to H2O DataFrame
val frameFromQuery = H2OContext.toDataFrame(sc,result)

Now it’s time for some Deep Learning. We don’t want to train a Deep Learning model using the ID field – since it is unique per user, its possible that the model learns to spot interesting user features from the ID alone – which obviously does not work on the next new user that comes along! This problem is calledoverfittingand H2O’s algorithms all have robust features to avoid it – but in this case we’ll just cut out the id column by selecting a subset of features to train on by name:

val trainFrame = frameFromQuery('age,'sex,'salary)

Deep Learning has about 60 parameters, but here we’ll limit ourselves to setting just a few and take all the defaults. Also note that sometimes the age or sex information is missing; this is a common problem in Data Science – you can see it in the example data. DataFrames naturally support this notion of missing data. However each of the Machine Learning algorithms in H2O handle missing data differently – the exact behavior depends on the deep mathematics in the algorithms. See this video for an excellent discussion on H2O’s DeepLearning.

// Deep Learning!
// – configure parameters
val dlParams = new DeepLearningParameters()
dlParams.source = trainFrame 
dlParams.response = trainFrame(‘salary)
// Run Deep Learning. The train() call starts the learning process
// and returns a Future; the training runs in the background. Calling
// get() blocks until the training completes.
val dlModel = new DeepLearning(dlParams).train.get

At this point we have a Deep Learning model that we can use to make predictions about new users – who have not shared their salary information with us. Those predictions are another H2O DataFrame and can be moved back and forth between H2O and Spark as any other DataFrame. Spark and Scala logic can be used to drive actions from the predictions, or just used in further in a deep RDD and H2O pipeline.


In just a few lines of Scala code we now blend H2O’s Machine Learning and Spark’s data-munging and SQL to form a complete ML solution:

  • Ingest data, via CSV, existing Spark RDD or existing H2O Frame
  • Manipulate the data, via Spark RDDs or as a Scala Collection
  • Model in H2O with cutting-edge algorithms
  • Use the Model to make Predictions
  • Use the Predictions in Spark RDDs and Scala to solve our problems

In the months ahead we will be rounding out this integration story – turning brand-new functionality into a production-ready solution.

I hope you like H2O’s new Spark and Scala integration – and always we are very excited to hear your feedback – and especially code to help further this good cause!  If you would like to contribute – please download the code from Git and submit your pull requests – or post suggestions and questions on h2ostream.


A K/V Store For In-Memory Analytics, Part 2

This is a continuation of a prior blog on the H2O K/V Store, Part 1.

A quick review on key bits going into this next blog:

  • H2O supports a very high performance in-memory Distributed K/V store
  • The store honors the full Java Memory Model with exact consistency by default
  • Keys can be cached locally for both reads & writes
  • A typical cached Key Get or Put takes 150ns
  • A cache-missing operation requires network traffic; 1 or 2 UDP packets, or a TCP connection for large payloads.
  • Single-key transactional updates are supported
  • The Keys are pseudo-randomly distributed, removing a “hot-blocks” problem
  • There is no “master” Node; the cluster is fully peer-to-peer
  • Individual Keys have a “Home Node”, responsible for ordering conflicting writes to the same Key; the Home Node varies from Key to Key
  • Network operations are built on top of a reliable RPC mechanism built into H2O

The code for all of this can be found in the H2O GIT repo:

The Distributed K/V Store (hereafter, the DKV store), is similar to the hardware cache-coherency protocols MESI, but altered to better match the software layer.  In this hardware analogy, a Key is akin to an address in memory, a Value’s payload is akin to the data stored at that address – and the Value itself is more like the hardware cache coherency bits.  These bits are not directly visible to the X86 programmer, but they have a profound impact on the (apparent) speed of memory, i.e. the cache behavior.  As is typical for this kind of thing, the actual implementation is basically a distributed Finite State Machine.

Keys have a “Home Node” (pseudo-randomly assigned – the Home Node is different for each Key).  Keys keep a little bit of state cached (mostly Home Node and a proper hash) but do not directly track coherency – that honor is given to the Values.

On the Home Node then, a Value tracks which other Nodes have replicas of this Value, and is responsible for invalidating those replicas when the Key is updated to a new Value.  Values also are used to force ordering of writes to the same Key from the same non-home writer – 1 JVM can only have a single outstanding write to the same Key in flight at once (although there’s no limit to the number of pending writes to unrelated Keys).  Hence parallel writes to unrelated Keys all work in parallel – the limit being the network bandwidth instead of the network latency.

Each Value holds a single AtomicInteger for a read/writer-lock, and a NonBlockingSetInt (really just a fast concurrent bitset), and a pointer to the user’s payload.  The R/W lock holds the count of pending Gets in-flight, or -1 if the Value is write-locked.  The pending-Gets count is typically zero, only going up when a new cached replica is being set in some Node; once the remote Node has the replica (completes it’s Get) the pending-Gets count will fall again.  The bitset holds the set of Nodes replicating this Value.  Gets are implemented in, and are an instance of H2O’s reliable RPC mechanism in

On a non-home node, the Value’s R/W lock field only holds a flag indicating that the Value is being overridden by a fresh local write, or not.

For a running example, lets assume a 4-node cluster.  The cluster members are called A,B,C, and D.  We have a single Key, “K1” and start with a single Value “V1”.  When writing a Value, we’ll follow with the count in the R/W lock, and a list of Nodes in the replica list.  Example: “V1[2,A]” means Value V1 has 2 pending-Gets, and is replicated on Node A.

A quiescent Value, with no Gets-in-flight and no replicas:


Actually, this is a 4-node cluster with K1 & V1 on Node A and not on the other nodes.  Hence a better picture is:


Node B has K1 and does a Get.  This misses in B‘s local K/V store (see, so B does a remote Get (  K1’s hash (available directly from K1 without talking to any other cluster member) tells us that K1’s home is Node A.  Hence B fires off a TaskGetKey from B to A requesting a replica of K1.  Here’s the flow:


Note that once Node B receives V1, the thread in B waiting for it can immediately begin processing V1.  The ACKACK is sent back asynchronously.  The request for K1 fits in a UDP packet.  Assuming reply V1 does as well, B only has to wait for a round-trip UDP packet send & receive before getting his data.  Assuming Nodes C & D make the same request:


At this point any Node in the cluster can retrieve V1 from K1 at no more cost than a hashtable lookup.

What happens when C wants to write a new Value V2 to Key K1?  C writes V2 locally, then sends V2 to AA then resolves any conflicting writes (none here), and invalidates the copies B & D have.  Future reads by B & D will then reload the updated value from A.  Finally, C gets notification that his write has completed.  Here’s the flow:


Let’s work through this diagram a little.

  • Node C writes V2 locally.  This means parallel threads within C can see the update immediately.  Also the writing thread can choose to do either a volatile write or not.  For a volatile write, the writing thread will block until it receives an ACK from Node A that the invalidates are complete.  For non-volatile writes, this is a fire-and-forget operation, although a future unrelated volatile write might need to block until C receives the ACK from A.
  • Node C writes V2 with an “X” in the R/W-lock – a flag indicating that the write of V2 is in-progress.  Node C is not allowed to start ANOTHER write to K1 until this write completes; further writes (of K1 from C) will block.  Hence Node A does not have to handle UDP out-of-order writes from C.  Writes to other Keys are not blocked.
  • Unlike a remote Get, the payload goes out in the initial send from C to A; the payload size determines if this is via UDP or TCP.  The ACKs are all small, and always go via UDP.
  • Value V1 in Node A is tracking the replicas, very akin to what a hardware cache mechanism would do.  Upon receiving the write from C several things happen in short order (but distinctly spaced in time): (1) A single atomic replace operation in A‘s local K/V store dictates which of several possible racing writes “wins” (at this point in time racing unrelated readers in A can see either V1 or V2; both are valid responses since the readers are racing).  (2) V1 is locked against future reads (at this point in time, any racing readers finding V1 will fail and upon retrying find V2); locking is done by setting the R/W-lock to -1.  (3) V2’s replicas are set to the sender C (who must have had a copy to send it!) and the receiver A, and these Nodes are removed from V1’s replica list.
  • Now A can start invalidate replicas around the cluster.  Note that if there are none, this operation takes no time!  Otherwise A sends out invalidate commands (to B & D here) and waits for the response…. thus causing C to wait until the cluster-wide invalidates are done.
  • As soon as the last invalidate is ACK’d back to A, A will ACK back to C.
  • The ACKACKs here can happen in any order and are not required for proper coherency & ordering; ACKACKs are used by the reliable RPC mechanism to know that the ACK made it.

Let’s talk a little about the performance here.  What operations are fast, or slow – and why?

  • All Things Cache Locally.  In the absence of conflicts, everything caches locally and no network traffic is required to make forward progress.  Cached Readers take about 150ns a pop.  Cached writes can all go fire-and-forget style (bound by network bandwidth and not latency), and a Reader following a Writer goes fast, and “sees” the write.
  • Readers Never Block.  If you look in the above flow, the only time a Reader blocks is on a cache-miss: the reader simply has to wait for the network to deliver the bytes.  Note that a Reader who misses locally and is blocked on the Home Node delivering a result, does NOT in turn wait for the Home Node to finish any other network operation.  In particular, the Home Node can deliver a valid result even while Invalidates are in-flight.
  • Unrelated Writes do not wait on each other.  Parallel unrelated writes go as fast as the network allows (bandwidth, not latency).
  • Writes can (rarely) Block on Readers.  Taking the write-lock requires all pending-Gets to complete.  Max of 1 pending-Get per Node, and they all complete in parallel – so this cannot take longer than the time to send the old Value around the cluster.
  • Back-to-back Writes to the same key Wait.  Each Node can only have 1 pending write to the same Key at a time.  Back-to-back updates will block until the first one fully completes.
  • Distributed writes to the same key are ordered, by the Home Node’s atomic replace operation.  However, they will typically be handled very fast after the first one – once the invalidates are done, a series of Puts will be ordered by the replace operation as fast as the memory system allows, and the ACKACKs can complete in any order (again, network bandwidth not latency).

These common use cases do not block, and go at either network speeds or memory speeds:

  • Parallel Reading Local Big Data.  Cached locally as-needed, reads at memory speeds
  • Parallel Reading Remote Big Data.  Uses full network bandwidth, and then cached thereafter
  • Parallel Writing Local Big Data.  Streaming writes use full memory bandwidth, no network
  • Parallel Writing Remote Big Data.  Streaming writes use full network bandwidth, not any round-trip latency

We have yet to talk about:

  • Resolving racing writes
  • resolving racing readers & writers
  • Slow network reordering events (reliable RPC turns unreliable network into slow network w/reordering)

But… I think I’m out of space!

Comments welcome as always,


H2O Architecture

H2O Architecture

This is a top-level overview of the H2O architecture.  H2O does in-memory analytics on clusters with distributed parallelized state-of-the-art Machine Learning algorithms.  However, the platform is very generic, and very very fast.  We’re building Machine Learning tools with it, because we think they’re cool and interesting, but the platform can do much more.

H2O is based on a number of layers, and is coded to at different layers to best approach different tasks and objectives.  We talk about the MapReduce execution flavor alot in public, because it’s easy to explain, because it covers a lot of ground, and because we’re implemented a bunch of dense linear-algebra style algorithms with it – but that’s not the only thing we can do with H2O, nor is it the only coding “style”.  Here’s a rundown of the layers in H2O:

In-memory distributed K/V store layer: H2O sports an in-memory (not-persistent) K/V store, with *exact* (not lazy) consistency semantics and transactions.  The memory model is exactly the Java Memory Model, but distributed.  Both reads and writes are fully cacheable, and typical cache-hit latencies are around 150ns (that’s 150 nanoseconds) from a NonBlockingHashMap.  Let me repeat that: reads and writes go through a non-blocking hash table – we do NOT suffer from a classic hot-blocks problem.  Cache-misses obviously require a network hop, and the execution times are totally driven by the size of data moved divided by available bandwidth… and of course the results are cached.  The K/V store is currently used hold control state, all results, and the Big Data itself.  You can certainly build a dandy graph-based algorithm directly over the K/V store, or integrate H2O’s model scoring into a low-latency production pipeline.   

      * A note on cluster size:  H2O clusters are peer-to-peer, and have no upper bound on the size.  We have built clusters with over a 100 nodes and multiple Tb’s of DRAM.  Every node “knows” where every Key is, without actually having to hold onto the Key.  Hence asking for a particular Key’s Value is no more expensive than 1 network hop from the Key requester to some Key holder, and back.

     * A note on compression Big Data is heavily (and losslessly) compressed – typically 2x to 4x better than GZIP on disk (YMMV), and can be accessed like a Java Array (a Giant greater-than-4billion-element distributed Java array).  H2O guarantees that if the data is accessed linearly then the access time will match what you can get out of C or Fortran – i.e., be memory bandwidth bound, not CPU bound.  You can access the array (for both reads and writes) in any order, of course, but you get strong speed guarantees for accessing in-order.  You can do pretty much anything to an H2O array that you can do with a Java array, although due to size/scale you’ll probably want to access the array in a blatantly parallel style.

      * A note on decompression: The data is decompressed Just-In-Time strictly in CPU registers in the hot inner loops – and THIS IS FASTER than decompressing beforehand because most algorithms are memory bandwidth bound.  Moving a 32byte cached line of compressed data into CPU registers gets more data per-cache-miss than moving 4 8-byte doubles.  Decompression typically takes 2-4 instructions of shift/scale/add per element, and is well covered by the cache-miss costs.  As an example, adding a billion compressed doubles (e.g., to compute the average) takes 12ms on a 32-core (dual-socket) E5-2670 Intel – an achieved bandwidth of over 600Gb/sec – hugely faster than all other Big Data solutions.  Accounting for compression, H2O achieved 83Gb/sec out of the theoretical maximum of 100Gb/sec on this hardware.

      * A note on Big Data and GC: H2O keeps all our data in heap, but in large arrays of Java primitives.  Our experience shows that we run without GC issues, even on very large heaps with the default collector.  We routinely test with e.g. heaps from 2G to 200G – and never see FullGC costs exceed a few seconds every now and then (depends on the rate of Big Data writing going on).  The normal Java object allocation used to drive the system internally has a negligible GC load.  We keep our data in-heap because its as fast as possible (memory bandwidth limited), and easy to code (pure Java), and has no interesting GC costs.  Our GC tuning policy is: “only use the -Xmx flag, set to the largest you can allow given the machine resources”.  Take all the other GC defaults, they will work fine.

      * A note on Bigger Data and GCWe do a user-mode swap-to-disk when the Java heap gets too full, i.e., you’re using more Big Data than physical DRAM.  We won’t die with a GC death-spiral, but we will degrade to out-of-core speeds.  We’ll go as fast as the disk will allow.  I’ve personally tested loading a 12Gb dataset into a 2Gb (32bit) JVM; it took about 5 minutes to load the data, and another 5 minutes to run a Logistic Regression.

      * A note on data ingestWe read data fully parallelized from S3, HDFS, NFS, URI’s, browser uploads, etc.  We can typically drive HDFS disk spindles to an interesting fraction of what you can get from e.g. HDFS file-copy.  We parse & compress (in parallel) a very generous notion of a CSV file (for instance, Hive files are directly ingestable), and SVM light files.  We are planning on an RDD ingester – interactivity with other frameworks is in everybody’s interest.

      * A note on sparse data: H2O sports about 15 different compression schemes under the hood, including ones designed to compress sparse data.  We happily import SVMLight without ever having the data “blow up” and still fully supporting the array-access API, including speed guarantees.

      * A note on missing data: Most data sets have missing elements, and most math algorithms deal with missing data specially.  H2O fully supports a notion of “NA” for all data, including setting, testing, selecting in (or out), etc, and this notion is woven through the data presentation layer.

    * A note on streaming data:H2O vectors can have data inserted & removed (anywhere, in any order) continuously.  In particular, it’s easy to add new data at the end and remove it from the start – i.e., a build a large rolling dataset holding all the elements that fit given a memory budget and a data flow-rate.  This has been on our roadmap for awhile, and needs only a little more work to be fully functional.

 Light-weight Map/Reduce layer: Map/Reduce is a nice way to write blatantly parallel code (although not the only way), and we support a particularly fast and efficient flavor.  A Map maps Type A to Type B, and a Reduce combines two Type B’s into one Type B.  Both Types A & B can be a combination of small-data (described as a Plain Old Java Object, a POJO) and big-data (described as another Giant H2O distributed array).  Here’s an example map from a Type A (a pair of columns), to a Type B (a POJO of Class MyMR holding various sums):
   new MyMR<MRTask> extends MRTask {
     double sum0, sum1, sq_sum0;           // Most things are allowed here
     @Override public void map( double d0, double d1 ) { 
       sum0 += d0;  sum1 += d1;  sq_sum0 += d0*d0;  // Again most any Java code here
     @Override public void reduce( MyMR my ) {   // Combine two MyMRs together
       sum0 += my.sum0; sum1 += my.sum1; sq_sum0 += my.sq_sum0;
   }.doAll( Vec v0, Vec v1 );  // Invoke in-parallel distributed

This code will be distributed ’round the cluster, and run at memory-bandwidth speeds (on compressed data!) with no further ado.  There’s a lot of mileage possible here that I’m only touching lightly on.  Filtering, subseting, writing results into temp arrays that are used on later next passes; uniques on billions of rows, ddply-style group-by operations all work in this Map/Reduce framework – and all work by writing plain old Java.

     * A note Group-By: We have a GroupBy operator running at scale (called ddply in the R community), built using this Map/Reduce framework.  The GroupBy can handle millions of groups on billions of rows, and runs Map/Reduce tasks on the group members (so a few large groups runs as fast as many small groups).

     * Scala, and a note on API cleanliness: We fully acknowledge Java’s weaknesses here – this is the Java6 flavor coding style; Java7 style is nicer – but still not as nice as some other languages.  We fully embrace & support alternative syntax(s) over our engine.  In particular, we have an engineer working on a in-process Scala (amongst other) interfaces.  We are shifting our focus now, from the excellent backend to the API interface side of things.  This is a work-in-progress for us, and we are looking forward to much improvement over the next year.

     * A note on R: H2O supports an R-like language – not full R semantics – but the obviously data-parallel data-munging aspects of R, and of course all the operators run fully parallel and distributed.  There is a REPL.  You can use it to add or drop columns or rows, manufacture features, impute missing values, or drop-in many R-expressions and have them run at-scale.

 Pre-Baked Algorithms LayerWe have the following algorithms pre-baked, fully optimized and full-featured: Generalized Linear Modeling, including Logistic Binomial Regression plus Gaussian, Gamma, Poisson, and Tweedie distributions; Deep LearningRandom Forest (that scales *out* to all the data in the cluster);  Gradient Boosted Machine that provides both multinomial classification and regression boosted models (again, in-parallel and fully distributed); PCAKMeans (and variants);  and a full suite of data characterization – for example Quantiles (any quantile, computed exactly in milliseconds).  All these algorithms support Confusion Matrices (with adjustable thresholds), AUC & ROC metrics, incremental test data-set results on partially trained models during the build process.  Within each algorithm, we support a full range of tuning parameters and options that you’d find in the similar R or SAS package.

* A note on some Mahout algorithms: We’re clearly well suited to e.g. SSVD and Co-occurrence and have talked with Ted Dunning at length on how they would be implemented in H2O.

 REST/ JSON/ R/ python/ Excel/ REPL: The system is externally drivable via URLs/REST API calls, with JSON responses.  We use REST/JSON from Python to drive all our testing harness.  We have a very nice R package with H2O integrated behind R – you can issue R commands to an H2O-backed R “data.frame” – and have all the Big Math work on the Big Data in a cluster – including 90% of the typical “data munging” workflow.  This same REST/JSON interface also works with e.g. Excel (yes we have a demo) or shell scripts.  We have a pretty web-GUI over the REST/JSON layer, that is suitable for lightweight modeling tasks.


A K/V Store For In-Memory Analytics: Part 1

First, some non-technical madness; fun technical bits below:

Jan 22: My life in the past 2 hours…
Rush to South San Jose from office team photo op –
Get my Kid L (age 16) from high school and rush her to to dentist in mid-Campbell: needs a bunch of cavities filled.
While sitting in the dentist’s office, Kid M (also mine, age 14) calls from Mom’s car – needs a book from Mom’s house; he’s staying away from J (18; 3rd kid) who is quarantined with a 103 degree fever. June (GF) calls – has a prescription for her rash – and a cause figured out finally; K (age 20, 4th kid) calls – she has J’s flu but isn’t feeling so bad, so she’s going to class in Berkeley. June wants to hear how K’s doing so I call June back.
M calls again – needs another book.
M calls a 3rd time – and this time Mom is explaining to she’s figured out what M is up too (and is apologizing for letting him call me so much). June calls – she has a flat in San Mateo, and has never dealt with a flat tire before in her life. I tell her to call AAA since I got kids & need to be home to cook dinner.  She canceled AAA last month.
I’m trying to ask her “how flat is flat”, but if June is noticing it, it’s pretty bad. Some stranger in the background is saying “no you can’t drive it to a station, you’ll ruin the rim” (so yeah, really bad). Two minutes later same stranger is offering to change her spare.
June calls back in 10mins; she has a spare, has standard Toyota gear for tire changing, but one of the lug nuts is rounded off & stuck.  10mins more the stranger gets the nut off, and I’m trying to find a place for June to take her car & get the tire repaired, while sitting in the dentist’s office.
In 30mins L comes out of the DDS chair with a numb mouth & fewer cavities,
then we get to run to Mom’s house & get her & M’s books & guitar, then home so I can cook dinner.
I think I actually wrote some code in the middle of all that, but I’m not really sure.

Feb 4: I’m pondering how to “grow” an internal engineer to work on our internal R-like programming language.  I figure we need a short intro-language to dev/make to get the basic language & tool-building mindset across.  I recall a Byte magazine article about a language called “Mouse”.  Google & the Internet come through: there’s a Wiki article on Mouse

and there at the bottom is the link to the Byte magazine, now fully scanned:

Scary stuff: 35 years later and I can fully recall some obscure programming language, down to all kinds of implementation details.  And ads for “fast 4K static RAMs” and “fully loaded TRS-80’s”.  But not my neighbor of 15 years kids’ names.


A K/V Store for In-Memory Analytics is building in-memory analytics (no surprise, see  What may be a surprise, though, is that there’s a full-fledged high-performance Key/Value store built into H2O and that is central to both our data management and our control logic.

We use the K/V store in two main ways:

  • All the Big Data is stored striped across the cluster with pseudo-random Keys.  For any given Key we can find the node which “homes” it from the Key itself, and cache it locally as needed.  Pseudo-random striping in practice means the data is typically well distributed around the cluster.  Caching (as opposed to a fixed placement) means we do not need to be perfect about which CPU works on which chunk of data – as long as *most* of the work is CPU-local.
  • About 1/2 the control logic goes through the K/V store (and the other half uses the RPC mechanism).  Anything which needs cross-node visibility and persistence uses the store, including progress counters, meta-data for temporary distributed vectors, built models, loaded datasets, and results from all kinds of work we wish to cache to avoid doing it again later.


First some simple facts:

  • The H2O K/V store supports the full Java Memory Model – lazy consistency can be asked for in places but typically only increases performance under certain use-cases (a mix of high volume reads & writes to the same keys).  Note that the JMM does not order writes to non-volatile variables, so high volume writes to unrelated Keys (as is common on temp data in the middle of some big calculation) all runs in parallel with both the network traffic and the actual Value production.  i.e., we don’t typically stall-on-write on any common use-case, and we still maintain exact JMM semantics.
  • All Keys can be cached locally – meaning a typical hot Key ‘get’ is cached, and costs no more than a hashtable ‘get’ – about 150ns.  Same for a ‘put’ – the write is cached locally, then forwarded to another node (AFAIK, this is the fastest K/V get/put on the planet, but see below about not persistent).  The forwarding happens in the background by default (unless you’ve specified volatile-like behavior).  Meanwhile, local readers will see the write immediately, and the writer is stalled for no more time than the hashtable ‘put’ and a single UDP-packet send.
  • H2O also supports transactional K/V updates – the transaction function is forwarded to the Key’s home, where it is run in a loop until the transaction succeeds or is aborted.  We often use this for e.g. asynchronous updates to progress bars.
  • The H2O Cloud is peer-to-peer.  There is no “name-node” nor central Key dictionary.  Each Key has a home-node, but the homes are picked pseudo-randomly per-key.
  • H2O’s store is not persistent, nor is it an HA solution.  We looked at those (and even at one point had a fully functioning Key auto-replicate & repair) and decided that the market was well served by existing technologies.  We opted to put our energies into the Math side of things.


Some Big Picture Details:

H2O’s K/V store is a classic peer-to-peer Distributed Hash Table, with the Keys distributed around the cluster via a psuedo-random hash function.  Pseudo-random because we can (and frequently do) muck with it, to force Keys to ‘home’ to different nodes (usually for load-balance reasons).  A Key’s ‘home’ is solely responsible for breaking ties in racing writes and is the “source of truth” for that Key.  To repeat: Keys can be cached anywhere, and both reads & writes can be cached (although a write is not complete until it reaches ‘home’, gets ordered with other writes, and an ACK is returned).  The ‘home’ is only consulted when breaking ties on conflicting writes or to fetch the value on a Key miss.

Keys are not much more than a blob of unique bytes, often char arrays from Strings or random UUID’s.

Values hold bits for maintaining consistency, plus status bits for being on some backing store (for the user-mode swap-to-disk), plus a big blob of bytes.  The blob of bytes is typically a serialized POJO, and if so aninflated copy of the POJO is kept around also.  We use generics to auto-inflate the POJO form:

MyPOJO pojo = DKV.get(key).get();

This code will set the variable “pojo” to a POJO pulled from the K/V store.  If all caches hit, this will take about 150ns.  There is a lot of raw compressed data (the Big Data part of big data), so Big Data is read directly from the bytes and its “POJO” form is the self byte array – i.e., we don’t keep two copies of Big Data (both a serialized and deserialized form).

Inflated POJOs (and their backing Values) are “immutable”.  While we cannot enforce this at the Java level, updates to the POJOs will not stick in the K/V store unless a ‘put’ is done.  More on this later, but mostly it turns into a coding style issue: if you need to update a POJO AND make the changes globally visible, you need to do a “DKV.put(key,pojo)” at some point.

The further restriction on POJOs is that they inherit from the class “Iced”.  The bytecode weaver will then inject all the code needed to serialize & deserialize (and a JSON pretty-printer, and a bunch of other code).  This rules out the default Java collections (although we have some equivalents – that can run *distributed*, because the collection can get larger than what a single node can hold).  In practice this hasn’t been an issue.  We serialize Iced & primitives, arrays of Iced & primitives and recursive subclasses of Iced.


Reliable Remote Procedure Call

H2O is a clustered solution which requires network communication to JVMs in unrelated process or machine memory spaces.  That network communication can be fast or slow, or may drop packets & sockets (even TCP can silently fail), and may need to be retried.  We have implemented a reliable RPC mechanism which retries failed communications at the RPC level.  An RPC contains a command (or call) to execute on the remote, plus the call arguments; there is a return value.  Both args & returns may be void, or small or may contain gigabytes of data.

Our mechanism has all the obvious optimizations: message data is compressed in a variaty of ways (because CPU is cheaper than network).  Short messages are sent via 1 or 2 UDP packets; larger message use TCP for congestion control. RPCalls are retried periodically until we get an ACK back; the ACK also contains the call’s return value.  The ACK itself is also retried until the called node gets an ACKACK back (and this ends the cycle of Call/ACK/ACKACK).  We handle all the problems with double-sending of tasks & replies.  The end experience is the client makes a blocking call, sending the ‘this’ POJO over the wire – and gets back a modified ‘this’ POJO with results filled in.

In practice, we can pull cables from a running cluster, and plug them back in, and the cluster will recover; – or drop >50% of all UDP packets and still have the system work (albeit more slowly with lots of retries).


And For Next Time, The Gory Details


Been too long since I’ve blogged, but this blog has become quite long already!  And I think I’m going to need pictures also… so, next time the gory details (and yes, building on the above parts).


Building a Distributed GBM on H2O

At 0xdata we build state-of-the-art distributed algorithms – and recently we embarked on building GBM, and algorithm notorious for being impossible to parallelize much less distribute.  We built the algorithm shown in Elements of Statistical Learning II, Trevor Hastie, Robert Tibshirani, and Jerome Friedman on page 387 (shown at the bottom of this post).  Most of the algorithm is straightforward “small” math, but step 2.b.ii says “Fit a regression tree to the targets….”, i.e. fit a regression tree in the middle of the inner loop, for targets that change with each outer loop.  This is where we decided to distribute/parallelize.

The platform we build on is H2O, and as talked about in the prior blog has an API focused on doing big parallel vector operations – and for GBM (and also Random Forest) we need to do big parallel tree operations.  But not really any tree operation; GBM (and RF) constantly build trees – and the work is always at the leaves of a tree, and is about finding the next best split point for the subset of training data that falls into a particular leaf.

The code can be found on our git:

For GBM, look in:

src/main/java/hex/tree/gbm/ - driver, main loop from ESL2
src/main/java/hex/tree/ - shared with RF, score & build 1 layer
src/main/java/hex/tree/ - collect & split histograms per leaf


The High Level Bits

We do a straightforward mapping from vectors to leaves of a tree: we assign a “leaf id” or “node id” to each data row in a new temp vector.  We also choose split points (based on the last pass) before making a pass over the data –
split-point decisions are made on summaries from the previous pass and are always “small data”, and typically aren’t useful to parallelize.  Splitting a leaf makes it an internal node, and we add 2 new leaves below it.

On each pass through the data, each observation R starts at an internal tree node; we get the node id (nid) from our temp vector.  The nid is a direct mapping into a POJO (Plain Old Java Object) for that node in a tree.  The node contains the split decision (e.g., “if age<45 go left, else go right”).  We move the observation R down the decision tree, left or right as appropriate, into a new leaf and record the new nid for next pass in our temp vector.

We then accumulate enough stats to make split decisions for the new leaves at the end of this pass.  Since this is a regression tree, we collect a histogram for each column – the mean & variance (using the recursive-mean technique).  Later we’ll pick the split point with the least variance, and the prediction if we don’t split will be the computed mean.

We repeat this process of moving a row R left or right, assigning a new nid, and summing into the histogram, for all rows.  At the end of this pass we can split each current leaf of the tree, effectively building the tree one layer in a breadth-first fashion.  We repeat for each layer, and after one tree is built we start in on the next.

Effectively then, we’re entirely parallel & distributed within a single tree level but not across trees (GBM trees have sequential dependencies) nor within levels of the same tree.  This parallel-within-level aspect means there’s an upper bound on tree building speed: the latency to build a single level.  However, since we’re completely parallel within a level we can make a single level faster with more CPUs (or conversely build a tree with more data in the same time).  In practice, we’re the fastest single-node GBM we’ve ever seen and we can usefully scale-out with datasets as small as a few 10’s of megs.


Finding an optimal split point is done by choosing a column/predictor/feature and a identifying the value within that feature to be the point at which the data are split so as to minimize some loss function such as Mean Squared Error (or maximize information gain).  The problem devolves to finding the optimal split value for a particular feature (and taking the max across all features). The optimal split is often found (with small data) by sorting the feature values and inspecting the induced split. For big data, this can be a problem (e.g. a distributed sort required for each tree leaf). We choose to approximate sorting with binning instead.

Binning (i.e. a radix sort) is fast & simple, can be done in the same pass as our other tree work, and can get arbitrarily close to the sorted solution with enough bins.  However, there’s always the question of unequal binning and  outliers that skew the bin limits.  We solve these problems with dynamic binning – we choose bin limits anew at each tree level – based on the split bins’ min and max values discovered during the last pass.

Choosing bin limits anew means that outliers won’t be present at the next bin split, and it means dense bins will split again with much tighter bounds.  i.e. at each split point we “drill in” on the interesting data, and after a logarithmic number of splits we will find the optimal split point.

For example: suppose we are binning into 3 bins (our default is 1024 bins) and the predictor values are:

{-600e3, -10, -1, -0.8, -0.4, 0.1, 0.3, 0.6, 2.0, 5.0, +600e3}

Notice the extreme outliers, and the dense data around -1.0 to 2.0.  Then the initial bin split points are found by interpolation over the min and max to be -400e3 & +400e3 inducing the split:

{-600e3} {-10, -1, -0.8, -0.4, 0.1, 0.3, 0.6, 2.0, 5.0}  {+600e3}

The outliers are removed into their own splits, and we’re left with a dense middle bin.  If on the next round we decide to split the middle bin, we’ll bin over the min & max of that bin, i.e. from -10 to +5 with split points -5 and 0:

{-10} { -1, -0.8, -0.4}  {0.1, 0.3, 0.6, 2.0, 5.0}

In two passes we’re already splitting around the dense region.  A further pass on the 2 fuller bins will yield:

{ -1 } {-0.8} {-0.4}
{0.1, 0.3, 0.6} { 2.0} { 5.0}

We have fine-grained binning around the dense clusters of data.

Multinomial (multi-class) Trees

The algorithm on ESL2, pg 387 directly supports multinomial trees – we end up building a tree-per-class.  These trees can be built in parallel with each other, and the predictive results from each tree are run together in step2 .b.iii. Note that the trees do not have to make the same split decisions, and typically do not especially for extreme minority classes.  Basically, the tree targeting some minority class is free to make decisions that optimize for that class.


For factor or categorical columns, we bin on the categorical levels (up to our bin limits in one pass).  We also use an equality-split instead of a relational-split, e.g.

{if feature==BLUE then/else}

instead of

{if feature < BLUE then/else}

We've been debating splitting on subsets of categories, to get more aggressive splitting when the data have many levels.  i.e., a 30,000 level categorical column will have a hard time exploring all possible categories in isolation without a lot of split levels.

As with all H2O algorithms, GBM can be accessed from our REST+JSON API, from the browser, from R, Python, Scala, Excel & Mathematica.  Enjoy!