Winds of Change

I’ve left H2O.  I wish them all the best.  I’ve left a longer farewell here.

I’m a loose cannon, at least for a few months, and am looking for (the right kind of) trouble.

So I’m here, posting on my blog again, to see if I can get some suggestions on what to do with my life.  🙂

Here are a few of my crazy Cliff ideas I’m sorta chasing around:

  • Python Go Fast: Do Unto Python as Thou hast Done Unto Java. I hack the guts of Python; add a high power JIT, a full-fledged low-pause GC, true multi-threading support, i.e. make Python as fast and as parallel as Java (about the same speed as C).  This blog is really a request for an open discussion on this topic.  Is the Python community interested?  How does this get funded?  (uber Kickstarter?)  I’ll only go here with the full support of the core Python committers, and general “feel goods” from the general python community – and I’m hoping to start a discussion.  At this point I’m a premier language implementer, and making Python Go Fast is well within my abilities and past experiences. Take about 2 years & $2M for this effort to be self-sustaining (build all the core new tech and hand it off to other contributors).
  • H2O2: I left a lot of unfinished technical work at H2O – and H2O has plenty of technical growing room.  I could continue to contribute to the Open Source side of H2O, with some Big Company footing the dev bill.  Big Company gets kudos for supporting Open Source, H2O gets the next generation of cool new features.
    • Plan B, Big Company funds some new core closed-source innovations to H2O and monetizes that.  H2O still gets some Open Source improvements but not all core tech work is open.
  • Teach: I bow out of the Great Rat Race for a year and go teach Virtual Machines at Stanford or Berkley.  Fun, and makes for a nice sabbatical.  (as a bonus, I’ll probably have 3 kids in college next year, and the whole Stanford Pays Faculty Kids’ College thing sounds really good).  Might could do this while hacking Python guts at the same time.
  • Jetsons: I know how to Do the Flying Car Thing Right.  Million people, million flying cars in the air, all at once, and nobody can crash anything.  Feels like you’re flying but the Autopilot-o-Doom holds it all together.  Got figured out how to handle bad weather, ground infrastructure lossage (e.g. the Big Quake wipes out ground support in 10sec, how do you land 1million cars all at once?), integration into existing transportation fabric, your driveway as a runway, playing nice with the big jets, etc.  Bold statement, needs bold proof.  Lots more stuff here, been thinking on this one for a decade.  Needs 3 or 4 years to put together the eye-popping this-might-actually-work prototype.  Thus needs bigger funding; probably in the $10M range to get serious.
  • Something Random: by which I mean pretty much anything else that’ll pay the bills and is fun to do.  I got some pretty darned broad skillz and interests…

Gonna be a long-needed Summer-o-Crazy-Fun for me.  Heck, maybe I’ll post my 2nd tweet (shudder).  🙂



How does Java Both Optimize Hot Loops and Allow Debugging

This blog came about because an old friend is trying to figure out how Java can do aggressive loop and inlining optimizations, while allowing the loading of new code and setting breakpoints… in that optimized code.

On 2/21/2015 11:04 AM, IG wrote:

Safepoint. I’m still confused because I don’t understand what is the required state at a safepoint that can’t be ensured at any random point of execution.  What is in-flight that must be permitted to complete? Given a cycle-by-cycle map you can find all the pointers at *any* cycle – is safepointing just a way to reduce the volume of pointer-locating info?

Reducing OOP-map volume happens, is useful – but is not the main thing being achieved.

It’s flipping backwards from the machine state to the language architected state, unwinding from super-aggressive optimization to reach the theoritical Java (or C/C++) language state at some points.  Toooo expensive to preserve that mapping line-by-line, call-by-call, execution-point-by-execution-point. So you keep the mapping at a few places, and optimize the bejeebers out of things in-between.  If somebody wants to:

  • set a breakpoint in optimized code (yes, works for Java all the time)
  • inject new classes, new targets for some virtual-call

then you need to stop the physical machine (actual CPU/Mill) at some place where you can find all the language architected pieces.  This mapping is far more complex than the oop/no-oop mapping for GC… but isn’t why you make it rare.  You make it rare because you have to hang onto all the language state at these safepoints, but don’t need it between safepoints.

An example might help:

  for( int i=0; i<N; i++ )
    sum += A[i]  // set a breakpoint here, for "i>1e6" or maybe "i>1e7"

So intent is to run awhile… then somebody sets a breakpoint… and wants to inspect “sum”, or change “i” in the debugger, then continue execution.  What’s the machine code look like, with no safepoints?  Something like this (bogus X86-like assembly code, pardon non-perfection):

 ... some setup, zero-trip test, range-check on "N" vs "A.length", etc
 ld  RA=&A        // Classic strength-reduced pointer into array A
 add RC,RA,#N*8   // Stopping value of A
 ld  RB=0         // sum
 ld  RD=[RA+off]  // Load from A
 add RB=RB,RD     // Sum into RB
 add RA=RA,8      // Advance our pointer into A
 cmp RA,RC        // Stop when at the end
 blt loop
 // RB contains sum


But actually, with some loop unrolling:

... some setup, zero-trip test, range-check on "N" vs "A.length", etc
 ld  RA=&A        // Classic strength-reduced pointer into array A
 add RC,RA,#N*8 & (-1-3)  // mask of low 2 bits of iteration - unroll by 4
 ld  RB=0 // sum
 prefetch [R1+off+32]  // Fetch next cache line... overlap memory & compute
 ld  R0=[RA+off+ 0]    // Get 4 values array A
 ld  R1=[RA+off+ 8]
 ld  R2=[RA+off+16]
 ld  R3=[RA+off+24]
 add R4=R0,R1  // add-tree to allow more parallel adders
 add R5=R2,R3  // add-tree to allow more parallel adders
 add RB=RB,R4  // partial sum into RB
 add RB=RB,R5  // full sum into RB
 add RA=RA,8*4 // Skip over 4 elements of array A
 cmp RA,RC     // Stop when at the end
 blt loop
 ... cleanup code for 0-3 more iterations
 // RB contains sum


Now I want to break on the “sum += A[i]” line… where is that, exactly, in that unrolled loop?

And which register contains “i”?  (Hint: none of them).

Do I even want to keep a shadow copy of “i” in parallel with my “A” pointer I’m rolling through the loop?  It’s pure overhead, unless I breakpoint and want to have some place to read “i”…. but even then I cannot *change* this shadow “i” because it won’t change my “A” pointer.

What about changing N?  Do I have enough info in the register mapping to describe all possible kinds of updates I might want to do to register RC when I tweak N?  This example is simple, but optimizing compilers can do hella complex code changes.

How does a Safepoint help here?  Well, for starters it points out that I do indeed need some way to find “i”, and if I change it then propagate those changes back into the loop.  Lets look at this code with a Safepoint in it.

... some setup, zero-trip test, range-check on "N" vs "A.length", etc
 ld  RA=&A        // Classic strength-reduced pointer into array A
 add RC,RA,#N*8 & (-1-3)  // mask of low 2 bits of iteration - unroll by 4
 ld  RB=0         // sum
 ld  RI=0         // Concession to Safepoint: keep "i" around
 prefetch [R1+off+32]  // Fetch next cache line... overlap memory & compute
 ld  R0=[RA+off+ 0]    // Get 4 values array A
 ld  R1=[RA+off+ 8]
 ld  R2=[RA+off+16]
 ld  R3=[RA+off+24]
 add R4=R0,R1  // add-tree to allow more parallel adders
 add R5=R2,R3  // add-tree to allow more parallel adders
 add RB=RB,R4  // partial sum into RB
 add RB=RB,R5  // full sum into RB
 add RA=RA,8*4 // Skip over 4 elements of array A
 add RI=RI,4   // Consession to Safepoint: keep "i" around
 SAFEPOINT: RI contains "i", RB contains "sum", "some place in the stack" contains "N"
 cmp RA,RC     // Stop when at the end
 blt loop
 ... cleanup code for 0-3 more iterations
 // RB contains sum


So I added 1 op in my hot loop to compute “i” on the fly.  Not too expensive; 1 clock per cache-line of data.  In general we end up keeping alive what is otherwise programmatically dead “just in case” the user stops the running code and wants to inspect (dead) variables – but this is cheap.  Just spill ’em off to stack somewhere and flag ’em in the Safepoints variable map.

How do we trigger the Safepoint?  Easiest way is to have the safepoint code test some thread-local bit for a “stop” bit, and if found… stop.  If we have to stop we might have a lot of cleanup to do, but stopping is rare so the cost (of mostly not stopping) is low.  There’s lots of ways to make the safepoint check cheap.  Here’s one that depends on keeping a 2Meg (TLB-sized) window above the stack pointer mapped in or out – perfect for an X86 TLB:

  ld RX,[RSP+2M] // will trap if we map this page out, then catch the SEGV & handle


How do we use the Safepoint?  Once we’ve stopped the running thread mid-loop, we expect the trap handler to have saved off all the registers.  Later we can inspect the register map to find “i” or “sum”.  Note that a Java-level query cannot ask for our “RA” register – as it’s the sum of “&RA” and “i*8” and isn’t visible at the language level – so “RA” does not appear in the Safepoint’s map.

How do we change “i” or “N” and have it All Just Work?  Basically, we re-enter a clone of the loop from above…. a clone specially made to be re-entered at any iteration.  The loop setup code in the clone will rebuild “RA” and “RB” and any other intermediate state as needed, before falling into essentially the same code as the normal loop (and indeed they might be shared here!).  Or in other words – since the optimizer did something complex to build “RA” from “&A” and “i” – have the optimized do it again for any arbitrary new “i”.

The general plan is simple (from the optimizing compiler’s point-of-view): Safepoints are treated like a function-call only taken on a very very rare execution path.  Safepoints take as inputs all visible language-level state, including the state of all inlined function calls, and generally are assumed to update all memory state but not any local vars (if you are also hacking local variable state via the debugger interface, then we need to go a step further and deopt/reopt – leading to the loop clone mentioned above).

With this magical “low frequency function call” in hand, the optimizer attempts the best optimizations it can, subject to the normal constraints that the call arguments are available somehow.

Hope this helps,


Hacking in H2O

In preparation for H2OWorld – I written a couple of blogs to help people quick-start in hacking in H2O.  All of these talks blend engineering, math and Big Data – and need nothing more than a really basic (free) Java development environment.



Sparkling Water: H2O + Scala + Spark

Spark is an up and coming new big data technology; it’s a whole lot faster and easier than existing Hadoop-based solutions. H2O does state-of-the-art Machine Learning algorithms over Big Data – and does them Fast. We are happy to announce that H2O now has a basic integration with Spark – Sparkling Water!

This is a “deep” integration – H2O can convert Spark RDDs to H2O DataFrames and vice-versa directly. The conversion is fully in-memory and in-process, and distributed and parallel as well. This also means H2O has a (nearly) full integration with Scala as well – H2O DataFrames are full Scala Collection objects – and the basic for each collection calls naturally run distributed and parallel also.

A few months back we announced the start of this initiative: Sparkling Water = H2O + Spark

In that post we went for the quickest integration solution possible: separate processes for Spark and H2O, with a Tachyon bridge. While this worked, it required fully 3 copies of all data: a Spark copy, a Tachyon bridge copy, and an H2O copy – and the data passed through process boundaries repeatedly. With this work we need only the Spark and H2O copies, the data is only copied once,and does not cross a process boundary. It’s both faster and uses less memory.

Scala integration is being showcased before Spark integration simply because we need to understand H2O’s Scala integration before we can understand H2O’s Spark integration.

H2O’s Scala API

As always, the code is open source and available on github. This code is all in 0xdata’s “h2o-dev” repro – a clean-slate dev friendly version of H2O. As of this writing, h2o-dev has all the core h2o logic, and about 50% of the machine learning algos (and the rest should appear in a month).

The basic Scala wrappers are here:

And some simple examples here:

The idea is really simple: a Scala DataFrame extends an H2O water.fvec.Frame object.

class DataFrame private ( key : Key, names : Array[String], vecs : Array[Vec] ) extends Frame(key,names,vecs) with Map[Long,Array[Option[Any]]] {

The Scala type is a Map[Long, Array[Option[Any]]] – a map from Long row numbers to rows of data – i.e. a single data observation. The observation is presented as an Array of Option[Any] – array elements are features in the observation. H2O fully supports the notion of missing elements or data – and this is presented as a Scala Option type. Once you know a value is not“missing” – what exactly is it’s type? H2O Vecs (columns) are typed to hold one of these types:- a Number (Java double conceptually, but any of boolean, byte, char, int, float, long, double)– a String– a Factor/Enum (similar to interned Strings that are mapped to small dense integer values)– a Timestamp (stored internally as milliseconds since the Unix Epoch)– a UUID

A DataFrame can be made from a Spark RDD or an H2O Frame directly or from a CSV file:

val df1 = new DataFrame(new File("some.csv"))

Column subsets (a smaller dataframe) can be selected via the column name:

val df2 : DataFrame = df2('a_column_header,'age,'id)

You can do a basic foreach:

df2.foreach( case(x,age,id) => ...use x and age and id... )

And this will run distributed and parallel across your cluster!

The code for foreach is very simple:

override def foreach[U](f: ((Long, T)) => U): Unit = {
  new MRTask {
    override def map( chks : Array[Chunk] ) = {
      val start = chks(0)._start
      val row = new T(chks.length)
      val len = chks(0).len
      (0 until len).foreach{ i =>
        (0 until chks.length).foreach{ col => row(col) = if( chks(col).isNA0(i) ) None else Some(chks(col).at0(i)) }

More complicated examples showing e.g. a scala map/reduce paradigm will follow later.

WaterWorks – Flowing Data between Spark and H2O

Let’s turn to Spark now, and show H2O and Spark working together. Spark RDD’s and H2O Frames are similar – but not equal – ways to manage large distributed data. Basically RDDs are a collection of blocks (Partitions) of rows, and Frames are a collection of columns, aligned in rows and broken up in Chunks.Sort of a blocks-vs-stripes thing.

H2O represents data in columns (Vecs in H2O lingo) vertically striped across the cluster; a Frame is a collection of columns/Vecs. Vecs, in turn,are broken up into Chunks – and the Chunks are carefully aligned in the JVM heaps such that whole rows of data (all the columns in a Frame) align.

Spark represents data in RDDs (Resilent Distributed Dataset) -a Collection of elements. Each RDD in broken up in turn into Partitions blocked across the cluster; each Partition holds a subset of the Collection as complete rows (or observations in data-science parlance).

H2O Frames, Vecs and Chunks can represent any primitive Numeric type, Strings, timestamps, and UUIDs – and support the notion of a missing value. RDDs ca nrepresent a collection of any Scala or Java object. If we limit the objects to containing fields that H2O represents then we can build a simple mapping between the two formats.

For our example we’ll look at a collection of Users:

class User(
  id: Long,
  age: Option[Short],
  sex: Option[Boolean],
  salary: Int

In Scala, this is typed as RDD[User], and we’ll have 1 User object per real User, blocked in Partitions around the cluster. In H2O, DataFrames naturally hold all numeric types and missing values in a column format; we’ll have 4columns each holding a value for each user again distributed in Chunks around the cluster. Here’s a single Partition hold 4 users, and showing 4 Chunks (one per column):


Here’s 4 JVM heaps holding all 32 users. There are 8 partitions (two per JVM)and 32 Chunks in 4 Vecs striped around the cluster:


An Example: Back and Forth between H2O and Spark

Here’s a simple Scala example; we’ll assume we have the 4-node Sparkling Water cluster above and a CSV (or Hive) file on disk. We’ll use a simple User data file featuring just a unique id, the user’s age, sex and salary. From just the age and sex we’ll try to predict the typical salary using DeepLearning.

First we load the CSV file into H2O. H2O features a fast robust CSV parser which is able to ingest CSV data at full disk bandwidths across a cluster. We can handle a wide variety of CSV formats (e.g. Hive is a CSV with ^A/0x01 field separators) and typically automatically infer all column types. Unlike Spark,operations in H2O are eager not lazy, so the file is loaded and parsed immediately:

// Load an H2O DataFrame from CSV file
val frameFromCSV = new DataFrame(new File("h2o-examples/smalldata/small_users.csv"))

Then we’ll convert to a Spark RDD – like all RDD operations, this conversion islazy; the conversion is defined but not executed yet:

val sc = createSparkContext()
val table : RDD[User] = H2OContext.toRDD(cs,frameFromCSV)

And then run an SQL query! SQL is a great way to subselect and munge data, and is one of Spark’s many strengths.

table.registerTempTable("user_table") // The RDD is now an SQL table
val query = "SELECT * FROM user_table WHERE AGE>=18" // Ignore underage users
val result = sql(query) // Run SQL query

Then we’ll convert back to an H2O DataFrame – and run H2O’s Deep Learning(NeuralNets) on the result. Since H2O operations are eager, this conversion triggers the actual SQL query to run just before converting.

// Convert back to H2O DataFrame
val frameFromQuery = H2OContext.toDataFrame(sc,result)

Now it’s time for some Deep Learning. We don’t want to train a Deep Learning model using the ID field – since it is unique per user, its possible that the model learns to spot interesting user features from the ID alone – which obviously does not work on the next new user that comes along! This problem is calledoverfittingand H2O’s algorithms all have robust features to avoid it – but in this case we’ll just cut out the id column by selecting a subset of features to train on by name:

val trainFrame = frameFromQuery('age,'sex,'salary)

Deep Learning has about 60 parameters, but here we’ll limit ourselves to setting just a few and take all the defaults. Also note that sometimes the age or sex information is missing; this is a common problem in Data Science – you can see it in the example data. DataFrames naturally support this notion of missing data. However each of the Machine Learning algorithms in H2O handle missing data differently – the exact behavior depends on the deep mathematics in the algorithms. See this video for an excellent discussion on H2O’s DeepLearning.

// Deep Learning!
// – configure parameters
val dlParams = new DeepLearningParameters()
dlParams.source = trainFrame 
dlParams.response = trainFrame(‘salary)
// Run Deep Learning. The train() call starts the learning process
// and returns a Future; the training runs in the background. Calling
// get() blocks until the training completes.
val dlModel = new DeepLearning(dlParams).train.get

At this point we have a Deep Learning model that we can use to make predictions about new users – who have not shared their salary information with us. Those predictions are another H2O DataFrame and can be moved back and forth between H2O and Spark as any other DataFrame. Spark and Scala logic can be used to drive actions from the predictions, or just used in further in a deep RDD and H2O pipeline.


In just a few lines of Scala code we now blend H2O’s Machine Learning and Spark’s data-munging and SQL to form a complete ML solution:

  • Ingest data, via CSV, existing Spark RDD or existing H2O Frame
  • Manipulate the data, via Spark RDDs or as a Scala Collection
  • Model in H2O with cutting-edge algorithms
  • Use the Model to make Predictions
  • Use the Predictions in Spark RDDs and Scala to solve our problems

In the months ahead we will be rounding out this integration story – turning brand-new functionality into a production-ready solution.

I hope you like H2O’s new Spark and Scala integration – and always we are very excited to hear your feedback – and especially code to help further this good cause!  If you would like to contribute – please download the code from Git and submit your pull requests – or post suggestions and questions on h2ostream.


A K/V Store For In-Memory Analytics, Part 2

This is a continuation of a prior blog on the H2O K/V Store, Part 1.

A quick review on key bits going into this next blog:

  • H2O supports a very high performance in-memory Distributed K/V store
  • The store honors the full Java Memory Model with exact consistency by default
  • Keys can be cached locally for both reads & writes
  • A typical cached Key Get or Put takes 150ns
  • A cache-missing operation requires network traffic; 1 or 2 UDP packets, or a TCP connection for large payloads.
  • Single-key transactional updates are supported
  • The Keys are pseudo-randomly distributed, removing a “hot-blocks” problem
  • There is no “master” Node; the cluster is fully peer-to-peer
  • Individual Keys have a “Home Node”, responsible for ordering conflicting writes to the same Key; the Home Node varies from Key to Key
  • Network operations are built on top of a reliable RPC mechanism built into H2O

The code for all of this can be found in the H2O GIT repo:

The Distributed K/V Store (hereafter, the DKV store), is similar to the hardware cache-coherency protocols MESI, but altered to better match the software layer.  In this hardware analogy, a Key is akin to an address in memory, a Value’s payload is akin to the data stored at that address – and the Value itself is more like the hardware cache coherency bits.  These bits are not directly visible to the X86 programmer, but they have a profound impact on the (apparent) speed of memory, i.e. the cache behavior.  As is typical for this kind of thing, the actual implementation is basically a distributed Finite State Machine.

Keys have a “Home Node” (pseudo-randomly assigned – the Home Node is different for each Key).  Keys keep a little bit of state cached (mostly Home Node and a proper hash) but do not directly track coherency – that honor is given to the Values.

On the Home Node then, a Value tracks which other Nodes have replicas of this Value, and is responsible for invalidating those replicas when the Key is updated to a new Value.  Values also are used to force ordering of writes to the same Key from the same non-home writer – 1 JVM can only have a single outstanding write to the same Key in flight at once (although there’s no limit to the number of pending writes to unrelated Keys).  Hence parallel writes to unrelated Keys all work in parallel – the limit being the network bandwidth instead of the network latency.

Each Value holds a single AtomicInteger for a read/writer-lock, and a NonBlockingSetInt (really just a fast concurrent bitset), and a pointer to the user’s payload.  The R/W lock holds the count of pending Gets in-flight, or -1 if the Value is write-locked.  The pending-Gets count is typically zero, only going up when a new cached replica is being set in some Node; once the remote Node has the replica (completes it’s Get) the pending-Gets count will fall again.  The bitset holds the set of Nodes replicating this Value.  Gets are implemented in, and are an instance of H2O’s reliable RPC mechanism in

On a non-home node, the Value’s R/W lock field only holds a flag indicating that the Value is being overridden by a fresh local write, or not.

For a running example, lets assume a 4-node cluster.  The cluster members are called A,B,C, and D.  We have a single Key, “K1” and start with a single Value “V1”.  When writing a Value, we’ll follow with the count in the R/W lock, and a list of Nodes in the replica list.  Example: “V1[2,A]” means Value V1 has 2 pending-Gets, and is replicated on Node A.

A quiescent Value, with no Gets-in-flight and no replicas:


Actually, this is a 4-node cluster with K1 & V1 on Node A and not on the other nodes.  Hence a better picture is:


Node B has K1 and does a Get.  This misses in B‘s local K/V store (see, so B does a remote Get (  K1’s hash (available directly from K1 without talking to any other cluster member) tells us that K1’s home is Node A.  Hence B fires off a TaskGetKey from B to A requesting a replica of K1.  Here’s the flow:


Note that once Node B receives V1, the thread in B waiting for it can immediately begin processing V1.  The ACKACK is sent back asynchronously.  The request for K1 fits in a UDP packet.  Assuming reply V1 does as well, B only has to wait for a round-trip UDP packet send & receive before getting his data.  Assuming Nodes C & D make the same request:


At this point any Node in the cluster can retrieve V1 from K1 at no more cost than a hashtable lookup.

What happens when C wants to write a new Value V2 to Key K1?  C writes V2 locally, then sends V2 to AA then resolves any conflicting writes (none here), and invalidates the copies B & D have.  Future reads by B & D will then reload the updated value from A.  Finally, C gets notification that his write has completed.  Here’s the flow:


Let’s work through this diagram a little.

  • Node C writes V2 locally.  This means parallel threads within C can see the update immediately.  Also the writing thread can choose to do either a volatile write or not.  For a volatile write, the writing thread will block until it receives an ACK from Node A that the invalidates are complete.  For non-volatile writes, this is a fire-and-forget operation, although a future unrelated volatile write might need to block until C receives the ACK from A.
  • Node C writes V2 with an “X” in the R/W-lock – a flag indicating that the write of V2 is in-progress.  Node C is not allowed to start ANOTHER write to K1 until this write completes; further writes (of K1 from C) will block.  Hence Node A does not have to handle UDP out-of-order writes from C.  Writes to other Keys are not blocked.
  • Unlike a remote Get, the payload goes out in the initial send from C to A; the payload size determines if this is via UDP or TCP.  The ACKs are all small, and always go via UDP.
  • Value V1 in Node A is tracking the replicas, very akin to what a hardware cache mechanism would do.  Upon receiving the write from C several things happen in short order (but distinctly spaced in time): (1) A single atomic replace operation in A‘s local K/V store dictates which of several possible racing writes “wins” (at this point in time racing unrelated readers in A can see either V1 or V2; both are valid responses since the readers are racing).  (2) V1 is locked against future reads (at this point in time, any racing readers finding V1 will fail and upon retrying find V2); locking is done by setting the R/W-lock to -1.  (3) V2’s replicas are set to the sender C (who must have had a copy to send it!) and the receiver A, and these Nodes are removed from V1’s replica list.
  • Now A can start invalidate replicas around the cluster.  Note that if there are none, this operation takes no time!  Otherwise A sends out invalidate commands (to B & D here) and waits for the response…. thus causing C to wait until the cluster-wide invalidates are done.
  • As soon as the last invalidate is ACK’d back to A, A will ACK back to C.
  • The ACKACKs here can happen in any order and are not required for proper coherency & ordering; ACKACKs are used by the reliable RPC mechanism to know that the ACK made it.

Let’s talk a little about the performance here.  What operations are fast, or slow – and why?

  • All Things Cache Locally.  In the absence of conflicts, everything caches locally and no network traffic is required to make forward progress.  Cached Readers take about 150ns a pop.  Cached writes can all go fire-and-forget style (bound by network bandwidth and not latency), and a Reader following a Writer goes fast, and “sees” the write.
  • Readers Never Block.  If you look in the above flow, the only time a Reader blocks is on a cache-miss: the reader simply has to wait for the network to deliver the bytes.  Note that a Reader who misses locally and is blocked on the Home Node delivering a result, does NOT in turn wait for the Home Node to finish any other network operation.  In particular, the Home Node can deliver a valid result even while Invalidates are in-flight.
  • Unrelated Writes do not wait on each other.  Parallel unrelated writes go as fast as the network allows (bandwidth, not latency).
  • Writes can (rarely) Block on Readers.  Taking the write-lock requires all pending-Gets to complete.  Max of 1 pending-Get per Node, and they all complete in parallel – so this cannot take longer than the time to send the old Value around the cluster.
  • Back-to-back Writes to the same key Wait.  Each Node can only have 1 pending write to the same Key at a time.  Back-to-back updates will block until the first one fully completes.
  • Distributed writes to the same key are ordered, by the Home Node’s atomic replace operation.  However, they will typically be handled very fast after the first one – once the invalidates are done, a series of Puts will be ordered by the replace operation as fast as the memory system allows, and the ACKACKs can complete in any order (again, network bandwidth not latency).

These common use cases do not block, and go at either network speeds or memory speeds:

  • Parallel Reading Local Big Data.  Cached locally as-needed, reads at memory speeds
  • Parallel Reading Remote Big Data.  Uses full network bandwidth, and then cached thereafter
  • Parallel Writing Local Big Data.  Streaming writes use full memory bandwidth, no network
  • Parallel Writing Remote Big Data.  Streaming writes use full network bandwidth, not any round-trip latency

We have yet to talk about:

  • Resolving racing writes
  • resolving racing readers & writers
  • Slow network reordering events (reliable RPC turns unreliable network into slow network w/reordering)

But… I think I’m out of space!

Comments welcome as always,


H2O Architecture

H2O Architecture

This is a top-level overview of the H2O architecture.  H2O does in-memory analytics on clusters with distributed parallelized state-of-the-art Machine Learning algorithms.  However, the platform is very generic, and very very fast.  We’re building Machine Learning tools with it, because we think they’re cool and interesting, but the platform can do much more.

H2O is based on a number of layers, and is coded to at different layers to best approach different tasks and objectives.  We talk about the MapReduce execution flavor alot in public, because it’s easy to explain, because it covers a lot of ground, and because we’re implemented a bunch of dense linear-algebra style algorithms with it – but that’s not the only thing we can do with H2O, nor is it the only coding “style”.  Here’s a rundown of the layers in H2O:

In-memory distributed K/V store layer: H2O sports an in-memory (not-persistent) K/V store, with *exact* (not lazy) consistency semantics and transactions.  The memory model is exactly the Java Memory Model, but distributed.  Both reads and writes are fully cacheable, and typical cache-hit latencies are around 150ns (that’s 150 nanoseconds) from a NonBlockingHashMap.  Let me repeat that: reads and writes go through a non-blocking hash table – we do NOT suffer from a classic hot-blocks problem.  Cache-misses obviously require a network hop, and the execution times are totally driven by the size of data moved divided by available bandwidth… and of course the results are cached.  The K/V store is currently used hold control state, all results, and the Big Data itself.  You can certainly build a dandy graph-based algorithm directly over the K/V store, or integrate H2O’s model scoring into a low-latency production pipeline.   

      * A note on cluster size:  H2O clusters are peer-to-peer, and have no upper bound on the size.  We have built clusters with over a 100 nodes and multiple Tb’s of DRAM.  Every node “knows” where every Key is, without actually having to hold onto the Key.  Hence asking for a particular Key’s Value is no more expensive than 1 network hop from the Key requester to some Key holder, and back.

     * A note on compression Big Data is heavily (and losslessly) compressed – typically 2x to 4x better than GZIP on disk (YMMV), and can be accessed like a Java Array (a Giant greater-than-4billion-element distributed Java array).  H2O guarantees that if the data is accessed linearly then the access time will match what you can get out of C or Fortran – i.e., be memory bandwidth bound, not CPU bound.  You can access the array (for both reads and writes) in any order, of course, but you get strong speed guarantees for accessing in-order.  You can do pretty much anything to an H2O array that you can do with a Java array, although due to size/scale you’ll probably want to access the array in a blatantly parallel style.

      * A note on decompression: The data is decompressed Just-In-Time strictly in CPU registers in the hot inner loops – and THIS IS FASTER than decompressing beforehand because most algorithms are memory bandwidth bound.  Moving a 32byte cached line of compressed data into CPU registers gets more data per-cache-miss than moving 4 8-byte doubles.  Decompression typically takes 2-4 instructions of shift/scale/add per element, and is well covered by the cache-miss costs.  As an example, adding a billion compressed doubles (e.g., to compute the average) takes 12ms on a 32-core (dual-socket) E5-2670 Intel – an achieved bandwidth of over 600Gb/sec – hugely faster than all other Big Data solutions.  Accounting for compression, H2O achieved 83Gb/sec out of the theoretical maximum of 100Gb/sec on this hardware.

      * A note on Big Data and GC: H2O keeps all our data in heap, but in large arrays of Java primitives.  Our experience shows that we run without GC issues, even on very large heaps with the default collector.  We routinely test with e.g. heaps from 2G to 200G – and never see FullGC costs exceed a few seconds every now and then (depends on the rate of Big Data writing going on).  The normal Java object allocation used to drive the system internally has a negligible GC load.  We keep our data in-heap because its as fast as possible (memory bandwidth limited), and easy to code (pure Java), and has no interesting GC costs.  Our GC tuning policy is: “only use the -Xmx flag, set to the largest you can allow given the machine resources”.  Take all the other GC defaults, they will work fine.

      * A note on Bigger Data and GCWe do a user-mode swap-to-disk when the Java heap gets too full, i.e., you’re using more Big Data than physical DRAM.  We won’t die with a GC death-spiral, but we will degrade to out-of-core speeds.  We’ll go as fast as the disk will allow.  I’ve personally tested loading a 12Gb dataset into a 2Gb (32bit) JVM; it took about 5 minutes to load the data, and another 5 minutes to run a Logistic Regression.

      * A note on data ingestWe read data fully parallelized from S3, HDFS, NFS, URI’s, browser uploads, etc.  We can typically drive HDFS disk spindles to an interesting fraction of what you can get from e.g. HDFS file-copy.  We parse & compress (in parallel) a very generous notion of a CSV file (for instance, Hive files are directly ingestable), and SVM light files.  We are planning on an RDD ingester – interactivity with other frameworks is in everybody’s interest.

      * A note on sparse data: H2O sports about 15 different compression schemes under the hood, including ones designed to compress sparse data.  We happily import SVMLight without ever having the data “blow up” and still fully supporting the array-access API, including speed guarantees.

      * A note on missing data: Most data sets have missing elements, and most math algorithms deal with missing data specially.  H2O fully supports a notion of “NA” for all data, including setting, testing, selecting in (or out), etc, and this notion is woven through the data presentation layer.

    * A note on streaming data:H2O vectors can have data inserted & removed (anywhere, in any order) continuously.  In particular, it’s easy to add new data at the end and remove it from the start – i.e., a build a large rolling dataset holding all the elements that fit given a memory budget and a data flow-rate.  This has been on our roadmap for awhile, and needs only a little more work to be fully functional.

 Light-weight Map/Reduce layer: Map/Reduce is a nice way to write blatantly parallel code (although not the only way), and we support a particularly fast and efficient flavor.  A Map maps Type A to Type B, and a Reduce combines two Type B’s into one Type B.  Both Types A & B can be a combination of small-data (described as a Plain Old Java Object, a POJO) and big-data (described as another Giant H2O distributed array).  Here’s an example map from a Type A (a pair of columns), to a Type B (a POJO of Class MyMR holding various sums):
   new MyMR<MRTask> extends MRTask {
     double sum0, sum1, sq_sum0;           // Most things are allowed here
     @Override public void map( double d0, double d1 ) { 
       sum0 += d0;  sum1 += d1;  sq_sum0 += d0*d0;  // Again most any Java code here
     @Override public void reduce( MyMR my ) {   // Combine two MyMRs together
       sum0 += my.sum0; sum1 += my.sum1; sq_sum0 += my.sq_sum0;
   }.doAll( Vec v0, Vec v1 );  // Invoke in-parallel distributed

This code will be distributed ’round the cluster, and run at memory-bandwidth speeds (on compressed data!) with no further ado.  There’s a lot of mileage possible here that I’m only touching lightly on.  Filtering, subseting, writing results into temp arrays that are used on later next passes; uniques on billions of rows, ddply-style group-by operations all work in this Map/Reduce framework – and all work by writing plain old Java.

     * A note Group-By: We have a GroupBy operator running at scale (called ddply in the R community), built using this Map/Reduce framework.  The GroupBy can handle millions of groups on billions of rows, and runs Map/Reduce tasks on the group members (so a few large groups runs as fast as many small groups).

     * Scala, and a note on API cleanliness: We fully acknowledge Java’s weaknesses here – this is the Java6 flavor coding style; Java7 style is nicer – but still not as nice as some other languages.  We fully embrace & support alternative syntax(s) over our engine.  In particular, we have an engineer working on a in-process Scala (amongst other) interfaces.  We are shifting our focus now, from the excellent backend to the API interface side of things.  This is a work-in-progress for us, and we are looking forward to much improvement over the next year.

     * A note on R: H2O supports an R-like language – not full R semantics – but the obviously data-parallel data-munging aspects of R, and of course all the operators run fully parallel and distributed.  There is a REPL.  You can use it to add or drop columns or rows, manufacture features, impute missing values, or drop-in many R-expressions and have them run at-scale.

 Pre-Baked Algorithms LayerWe have the following algorithms pre-baked, fully optimized and full-featured: Generalized Linear Modeling, including Logistic Binomial Regression plus Gaussian, Gamma, Poisson, and Tweedie distributions; Deep LearningRandom Forest (that scales *out* to all the data in the cluster);  Gradient Boosted Machine that provides both multinomial classification and regression boosted models (again, in-parallel and fully distributed); PCAKMeans (and variants);  and a full suite of data characterization – for example Quantiles (any quantile, computed exactly in milliseconds).  All these algorithms support Confusion Matrices (with adjustable thresholds), AUC & ROC metrics, incremental test data-set results on partially trained models during the build process.  Within each algorithm, we support a full range of tuning parameters and options that you’d find in the similar R or SAS package.

* A note on some Mahout algorithms: We’re clearly well suited to e.g. SSVD and Co-occurrence and have talked with Ted Dunning at length on how they would be implemented in H2O.

 REST/ JSON/ R/ python/ Excel/ REPL: The system is externally drivable via URLs/REST API calls, with JSON responses.  We use REST/JSON from Python to drive all our testing harness.  We have a very nice R package with H2O integrated behind R – you can issue R commands to an H2O-backed R “data.frame” – and have all the Big Math work on the Big Data in a cluster – including 90% of the typical “data munging” workflow.  This same REST/JSON interface also works with e.g. Excel (yes we have a demo) or shell scripts.  We have a pretty web-GUI over the REST/JSON layer, that is suitable for lightweight modeling tasks.


A K/V Store For In-Memory Analytics: Part 1

First, some non-technical madness; fun technical bits below:

Jan 22: My life in the past 2 hours…
Rush to South San Jose from office team photo op –
Get my Kid L (age 16) from high school and rush her to to dentist in mid-Campbell: needs a bunch of cavities filled.
While sitting in the dentist’s office, Kid M (also mine, age 14) calls from Mom’s car – needs a book from Mom’s house; he’s staying away from J (18; 3rd kid) who is quarantined with a 103 degree fever. June (GF) calls – has a prescription for her rash – and a cause figured out finally; K (age 20, 4th kid) calls – she has J’s flu but isn’t feeling so bad, so she’s going to class in Berkeley. June wants to hear how K’s doing so I call June back.
M calls again – needs another book.
M calls a 3rd time – and this time Mom is explaining to she’s figured out what M is up too (and is apologizing for letting him call me so much). June calls – she has a flat in San Mateo, and has never dealt with a flat tire before in her life. I tell her to call AAA since I got kids & need to be home to cook dinner.  She canceled AAA last month.
I’m trying to ask her “how flat is flat”, but if June is noticing it, it’s pretty bad. Some stranger in the background is saying “no you can’t drive it to a station, you’ll ruin the rim” (so yeah, really bad). Two minutes later same stranger is offering to change her spare.
June calls back in 10mins; she has a spare, has standard Toyota gear for tire changing, but one of the lug nuts is rounded off & stuck.  10mins more the stranger gets the nut off, and I’m trying to find a place for June to take her car & get the tire repaired, while sitting in the dentist’s office.
In 30mins L comes out of the DDS chair with a numb mouth & fewer cavities,
then we get to run to Mom’s house & get her & M’s books & guitar, then home so I can cook dinner.
I think I actually wrote some code in the middle of all that, but I’m not really sure.

Feb 4: I’m pondering how to “grow” an internal engineer to work on our internal R-like programming language.  I figure we need a short intro-language to dev/make to get the basic language & tool-building mindset across.  I recall a Byte magazine article about a language called “Mouse”.  Google & the Internet come through: there’s a Wiki article on Mouse

and there at the bottom is the link to the Byte magazine, now fully scanned:

Scary stuff: 35 years later and I can fully recall some obscure programming language, down to all kinds of implementation details.  And ads for “fast 4K static RAMs” and “fully loaded TRS-80’s”.  But not my neighbor of 15 years kids’ names.


A K/V Store for In-Memory Analytics is building in-memory analytics (no surprise, see  What may be a surprise, though, is that there’s a full-fledged high-performance Key/Value store built into H2O and that is central to both our data management and our control logic.

We use the K/V store in two main ways:

  • All the Big Data is stored striped across the cluster with pseudo-random Keys.  For any given Key we can find the node which “homes” it from the Key itself, and cache it locally as needed.  Pseudo-random striping in practice means the data is typically well distributed around the cluster.  Caching (as opposed to a fixed placement) means we do not need to be perfect about which CPU works on which chunk of data – as long as *most* of the work is CPU-local.
  • About 1/2 the control logic goes through the K/V store (and the other half uses the RPC mechanism).  Anything which needs cross-node visibility and persistence uses the store, including progress counters, meta-data for temporary distributed vectors, built models, loaded datasets, and results from all kinds of work we wish to cache to avoid doing it again later.


First some simple facts:

  • The H2O K/V store supports the full Java Memory Model – lazy consistency can be asked for in places but typically only increases performance under certain use-cases (a mix of high volume reads & writes to the same keys).  Note that the JMM does not order writes to non-volatile variables, so high volume writes to unrelated Keys (as is common on temp data in the middle of some big calculation) all runs in parallel with both the network traffic and the actual Value production.  i.e., we don’t typically stall-on-write on any common use-case, and we still maintain exact JMM semantics.
  • All Keys can be cached locally – meaning a typical hot Key ‘get’ is cached, and costs no more than a hashtable ‘get’ – about 150ns.  Same for a ‘put’ – the write is cached locally, then forwarded to another node (AFAIK, this is the fastest K/V get/put on the planet, but see below about not persistent).  The forwarding happens in the background by default (unless you’ve specified volatile-like behavior).  Meanwhile, local readers will see the write immediately, and the writer is stalled for no more time than the hashtable ‘put’ and a single UDP-packet send.
  • H2O also supports transactional K/V updates – the transaction function is forwarded to the Key’s home, where it is run in a loop until the transaction succeeds or is aborted.  We often use this for e.g. asynchronous updates to progress bars.
  • The H2O Cloud is peer-to-peer.  There is no “name-node” nor central Key dictionary.  Each Key has a home-node, but the homes are picked pseudo-randomly per-key.
  • H2O’s store is not persistent, nor is it an HA solution.  We looked at those (and even at one point had a fully functioning Key auto-replicate & repair) and decided that the market was well served by existing technologies.  We opted to put our energies into the Math side of things.


Some Big Picture Details:

H2O’s K/V store is a classic peer-to-peer Distributed Hash Table, with the Keys distributed around the cluster via a psuedo-random hash function.  Pseudo-random because we can (and frequently do) muck with it, to force Keys to ‘home’ to different nodes (usually for load-balance reasons).  A Key’s ‘home’ is solely responsible for breaking ties in racing writes and is the “source of truth” for that Key.  To repeat: Keys can be cached anywhere, and both reads & writes can be cached (although a write is not complete until it reaches ‘home’, gets ordered with other writes, and an ACK is returned).  The ‘home’ is only consulted when breaking ties on conflicting writes or to fetch the value on a Key miss.

Keys are not much more than a blob of unique bytes, often char arrays from Strings or random UUID’s.

Values hold bits for maintaining consistency, plus status bits for being on some backing store (for the user-mode swap-to-disk), plus a big blob of bytes.  The blob of bytes is typically a serialized POJO, and if so aninflated copy of the POJO is kept around also.  We use generics to auto-inflate the POJO form:

MyPOJO pojo = DKV.get(key).get();

This code will set the variable “pojo” to a POJO pulled from the K/V store.  If all caches hit, this will take about 150ns.  There is a lot of raw compressed data (the Big Data part of big data), so Big Data is read directly from the bytes and its “POJO” form is the self byte array – i.e., we don’t keep two copies of Big Data (both a serialized and deserialized form).

Inflated POJOs (and their backing Values) are “immutable”.  While we cannot enforce this at the Java level, updates to the POJOs will not stick in the K/V store unless a ‘put’ is done.  More on this later, but mostly it turns into a coding style issue: if you need to update a POJO AND make the changes globally visible, you need to do a “DKV.put(key,pojo)” at some point.

The further restriction on POJOs is that they inherit from the class “Iced”.  The bytecode weaver will then inject all the code needed to serialize & deserialize (and a JSON pretty-printer, and a bunch of other code).  This rules out the default Java collections (although we have some equivalents – that can run *distributed*, because the collection can get larger than what a single node can hold).  In practice this hasn’t been an issue.  We serialize Iced & primitives, arrays of Iced & primitives and recursive subclasses of Iced.


Reliable Remote Procedure Call

H2O is a clustered solution which requires network communication to JVMs in unrelated process or machine memory spaces.  That network communication can be fast or slow, or may drop packets & sockets (even TCP can silently fail), and may need to be retried.  We have implemented a reliable RPC mechanism which retries failed communications at the RPC level.  An RPC contains a command (or call) to execute on the remote, plus the call arguments; there is a return value.  Both args & returns may be void, or small or may contain gigabytes of data.

Our mechanism has all the obvious optimizations: message data is compressed in a variaty of ways (because CPU is cheaper than network).  Short messages are sent via 1 or 2 UDP packets; larger message use TCP for congestion control. RPCalls are retried periodically until we get an ACK back; the ACK also contains the call’s return value.  The ACK itself is also retried until the called node gets an ACKACK back (and this ends the cycle of Call/ACK/ACKACK).  We handle all the problems with double-sending of tasks & replies.  The end experience is the client makes a blocking call, sending the ‘this’ POJO over the wire – and gets back a modified ‘this’ POJO with results filled in.

In practice, we can pull cables from a running cluster, and plug them back in, and the cluster will recover; – or drop >50% of all UDP packets and still have the system work (albeit more slowly with lots of retries).


And For Next Time, The Gory Details


Been too long since I’ve blogged, but this blog has become quite long already!  And I think I’m going to need pictures also… so, next time the gory details (and yes, building on the above parts).


Building a Distributed GBM on H2O

At 0xdata we build state-of-the-art distributed algorithms – and recently we embarked on building GBM, and algorithm notorious for being impossible to parallelize much less distribute.  We built the algorithm shown in Elements of Statistical Learning II, Trevor Hastie, Robert Tibshirani, and Jerome Friedman on page 387 (shown at the bottom of this post).  Most of the algorithm is straightforward “small” math, but step 2.b.ii says “Fit a regression tree to the targets….”, i.e. fit a regression tree in the middle of the inner loop, for targets that change with each outer loop.  This is where we decided to distribute/parallelize.

The platform we build on is H2O, and as talked about in the prior blog has an API focused on doing big parallel vector operations – and for GBM (and also Random Forest) we need to do big parallel tree operations.  But not really any tree operation; GBM (and RF) constantly build trees – and the work is always at the leaves of a tree, and is about finding the next best split point for the subset of training data that falls into a particular leaf.

The code can be found on our git:

For GBM, look in:

src/main/java/hex/tree/gbm/ - driver, main loop from ESL2
src/main/java/hex/tree/ - shared with RF, score & build 1 layer
src/main/java/hex/tree/ - collect & split histograms per leaf


The High Level Bits

We do a straightforward mapping from vectors to leaves of a tree: we assign a “leaf id” or “node id” to each data row in a new temp vector.  We also choose split points (based on the last pass) before making a pass over the data –
split-point decisions are made on summaries from the previous pass and are always “small data”, and typically aren’t useful to parallelize.  Splitting a leaf makes it an internal node, and we add 2 new leaves below it.

On each pass through the data, each observation R starts at an internal tree node; we get the node id (nid) from our temp vector.  The nid is a direct mapping into a POJO (Plain Old Java Object) for that node in a tree.  The node contains the split decision (e.g., “if age<45 go left, else go right”).  We move the observation R down the decision tree, left or right as appropriate, into a new leaf and record the new nid for next pass in our temp vector.

We then accumulate enough stats to make split decisions for the new leaves at the end of this pass.  Since this is a regression tree, we collect a histogram for each column – the mean & variance (using the recursive-mean technique).  Later we’ll pick the split point with the least variance, and the prediction if we don’t split will be the computed mean.

We repeat this process of moving a row R left or right, assigning a new nid, and summing into the histogram, for all rows.  At the end of this pass we can split each current leaf of the tree, effectively building the tree one layer in a breadth-first fashion.  We repeat for each layer, and after one tree is built we start in on the next.

Effectively then, we’re entirely parallel & distributed within a single tree level but not across trees (GBM trees have sequential dependencies) nor within levels of the same tree.  This parallel-within-level aspect means there’s an upper bound on tree building speed: the latency to build a single level.  However, since we’re completely parallel within a level we can make a single level faster with more CPUs (or conversely build a tree with more data in the same time).  In practice, we’re the fastest single-node GBM we’ve ever seen and we can usefully scale-out with datasets as small as a few 10’s of megs.


Finding an optimal split point is done by choosing a column/predictor/feature and a identifying the value within that feature to be the point at which the data are split so as to minimize some loss function such as Mean Squared Error (or maximize information gain).  The problem devolves to finding the optimal split value for a particular feature (and taking the max across all features). The optimal split is often found (with small data) by sorting the feature values and inspecting the induced split. For big data, this can be a problem (e.g. a distributed sort required for each tree leaf). We choose to approximate sorting with binning instead.

Binning (i.e. a radix sort) is fast & simple, can be done in the same pass as our other tree work, and can get arbitrarily close to the sorted solution with enough bins.  However, there’s always the question of unequal binning and  outliers that skew the bin limits.  We solve these problems with dynamic binning – we choose bin limits anew at each tree level – based on the split bins’ min and max values discovered during the last pass.

Choosing bin limits anew means that outliers won’t be present at the next bin split, and it means dense bins will split again with much tighter bounds.  i.e. at each split point we “drill in” on the interesting data, and after a logarithmic number of splits we will find the optimal split point.

For example: suppose we are binning into 3 bins (our default is 1024 bins) and the predictor values are:

{-600e3, -10, -1, -0.8, -0.4, 0.1, 0.3, 0.6, 2.0, 5.0, +600e3}

Notice the extreme outliers, and the dense data around -1.0 to 2.0.  Then the initial bin split points are found by interpolation over the min and max to be -400e3 & +400e3 inducing the split:

{-600e3} {-10, -1, -0.8, -0.4, 0.1, 0.3, 0.6, 2.0, 5.0}  {+600e3}

The outliers are removed into their own splits, and we’re left with a dense middle bin.  If on the next round we decide to split the middle bin, we’ll bin over the min & max of that bin, i.e. from -10 to +5 with split points -5 and 0:

{-10} { -1, -0.8, -0.4}  {0.1, 0.3, 0.6, 2.0, 5.0}

In two passes we’re already splitting around the dense region.  A further pass on the 2 fuller bins will yield:

{ -1 } {-0.8} {-0.4}
{0.1, 0.3, 0.6} { 2.0} { 5.0}

We have fine-grained binning around the dense clusters of data.

Multinomial (multi-class) Trees

The algorithm on ESL2, pg 387 directly supports multinomial trees – we end up building a tree-per-class.  These trees can be built in parallel with each other, and the predictive results from each tree are run together in step2 .b.iii. Note that the trees do not have to make the same split decisions, and typically do not especially for extreme minority classes.  Basically, the tree targeting some minority class is free to make decisions that optimize for that class.


For factor or categorical columns, we bin on the categorical levels (up to our bin limits in one pass).  We also use an equality-split instead of a relational-split, e.g.

{if feature==BLUE then/else}

instead of

{if feature < BLUE then/else}

We've been debating splitting on subsets of categories, to get more aggressive splitting when the data have many levels.  i.e., a 30,000 level categorical column will have a hard time exploring all possible categories in isolation without a lot of split levels.

As with all H2O algorithms, GBM can be accessed from our REST+JSON API, from the browser, from R, Python, Scala, Excel & Mathematica.  Enjoy!



An API For Distributed Analytics

There are so many APIs to choose from!

Features of the space:

  • Lots of data – which I’ll qualify as “bigger than 1 machine” and thus needing parallel i.o, parallel memory, & parallel compute – and distributed algorithms.
  • Ease of programming; hide details (but expose when want to).  High level for ease-of-use, but “under the covers” has to be easy to understand- because no tool solves all problems – so expect extensions & frequent 1-off hacks.
  • Speed: In-memory by default, where memory can range from 2G to 2T and beyond.  Data placement is required (do not schlop data about unless needed, move code to data, no disk i/o by default).
  • Speed By Default: the normal/average/typical programming style will be fast.  You can “trip over yourself” and go slow, but normal usage is fast.  Obvious when you’re moving away from “go fast” to “unknown speed”
  • Correct By Default: the normal/average/typical programming style will not be exposed to weird corner cases.  No data-races (unless you ask for them), no weird ordering rules, job-scheduling, nor counting mappers & reducers, no figuring out sharding or data placement, nor other low-level easy-to-get-wrong details.  Resource management simple by design.
  • Accessible for non-expert programmers, scientists, engineers, managers – looking for a tool, not wanting the tool to be more complicated than the problem


Design decisions:

Automatic data placement: It’s a hard problem, and its been hard for a long time – but technology is changing: networks are fast relative to the size of memory.  You can move a significant fraction of memory in a cluster in relatively little time.  “Disk is the new tape”:  we want to do work in-memory for that 1000x speedup over disk, but this requires loading memory into one of many little slices in the cluster – which implies data-placement.  Start with random placement – while it’s never perfect, it’s also rarely “perfectly wrong” – you get consistent decent access patterns. Then add local caching to catch the hot common read blocks, and local caching of hot or streaming writes.  For H2O, this is one of the reasons for our K/V store; we get full JMM semantics, exact consistency, but also raw speed: 150ns for a cache-hitting read or write.  Typically a cache miss is a few msec (1 network hop there and back).

Map/Reduce: It’s a simple paradigm shown to scale well.  Much of Big Data involves some kind of structure (log files, bit/byte streams, up to well organized SQL/Hive/DB files).  Map is very generic, and allows an arbitrary function on a unit of work/data to be easily scaled.  Reduce brings Big down to Small in a logarithmic number of steps.  Together, they handle a lot of problems.  Key to making this work: simplicity in the API.  Map converts an A to a B, and Reduce combines two B’s into one B – for any arbitrary A and B.  No resource management, no counting Map or Reduce slots, no shuffle, no keys.  Just a nice clean Map and Reduce.  For H2O this means: Map reads your data directly (type A) and produces results in a Plain-Olde Java POJO’s (type B) – which is also the Map’s “this” pointer.  Results returned directly in “this”.  “This is Not Your Father’s Map Reduce”

Correct By Default: No multi-threaded access questions.  Synchronization, if needed, is provided already.  No figuring out sharding or data placement; replication (caching) will happen as-needed.  NO resource management, other than Xmx (Java heap size).  Like sync, resource management is notoriously hard to get right so don’t require people to do it.  For 0xdata, this means we use very fine grained parallelism (no Map is too small, Doug Lea Fork/Join), and very fine-grained Reduces (so all Big Data shrinks as rapidly as possible).

Fast By Default: Use of the default Map/Reduce API will produce programs that run in parallel & distributed across the cluster, at memory bandwidth speeds for both reads and writes.  Other clustered / parallel paradigms are available but are not guaranteed to go fast.  The API has a simple obvious design, and all calls have a “cost model” associated with them (these calls are guaranteed fast, these calls are only fast in these situations, these calls will work but may be slow, etc.  For 0xdata, code that accesses any number of columns at once (e.g. a single row) – but independent rows – will run at memory bandwidth speeds.  Same for writing any number of columns, including writing subsets (filtering) on rows.  Reductions will happen every so many Maps in a Log-tree fashion.  All filter results are guaranteed to be strongly ordered as well (despite the distributed & parallel execution).

Easy / Multiple APIs – Not all APIs are for all users!  Java & Map/Reduce are good for Java programmers – but people do math in R, Python and a host of other tools.  For 0xdata, we are a team of language implementers as well as mathematicians and systems’ engineers.  We have implemented a generic REST/JSON API and can drive this API from R, python, bash, and Excel – with the ability to add more clients easily.  From inside the JVM, we can drive the system using Scala, or a simple REPL with an R-like syntax.


Lets get a little more concrete here, and bring out the jargon –

A H2O Data Taxonomy

Primitives – at the bottom level, the data are a Java primitive – be it a byte, char, long or double.  Or at least that’s the presentation.  Under the hood we compress aggressively, often seeing 2x to 4x more compression than the GZIP’d file on disk – and we can do math on this compressed form typically at memory bandwidth speeds (i.e., the work to decompress is hidden in the overhead of pulling the data in from memory).  We also support the notion of “missing data” – a crucial notion for data scientists.  It’s similar to Double.NaN, but for all data types.

A Chunk – The basic unit of parallel work, typically holding  1e3 to 1e6 (compressed) primitives.  This data unit is completely hidden, unless you are writing batch-Map calls (where the batching is for efficiency).  It’s big enough to hide control overheads when launching parallel tasks, and small enough to be the unit of caching.  We promise that Chunks from Vecs being worked on together will be co-located on the same machine.

A Vec – A Distributed Vector.  Just like a Java array, but it can hold more than 2e31 elements – limited only by available memory.  Usually used to hold data of a single conceptual type, such as a person’s age or IP address or name or last-click-time or balance, etc.  This is the main conceptual holder of Big Data, and collections of these will typically make up all your data.  Access will be parallel & distributed, and at memory-bandwidth speeds.

A Frame – A Collection of Vecs.  Pattered after R’s data.frame (it’s worked very well for more than 20 years!).  While Vecs might Big Data (and thus can be expensive to touch all of), Frames are mere pointers to Vecs.  We add & drop columns and reorganize them willy-nilly.  The data munging side of things has a lot of convenience functions here.


The Map/Reduce API

(1) Make a subclass of MRTask2, with POJO Java fields that inherent from Iced, or are primitives, or arrays of either.  Why subclassed from Iced?  Because the bytecode weaver will inject code to do a number of things, including serialization & JSON display, and code & loop optimizations.

class Calc extends MRTask {

(2) Break out the small-data Inputs to the Map, and initialize them in an instance of your subclass.  “Small data” will be replicated across the cluster, and available as read-only copies everywhere.  Inputs need to be read-only as they will be shared on each node in the cluster.  “Small” needs to fit in memory and my example is with doubles, but mega-byte sized data is cheap & commonly done.

  final double mean;   // Read-only, shared, distributed
  final int maxHisto;  // Read-only, shared, distributed
  Calc(double meanX, int maxHisto) { this.mean = meanX;  this.maxHisto = maxHisto; }

(3) Break out the small-data Outputs from your map, and initialize them in the Map call.  Because they are initialized in the Map, they are guaranteed thread-local.  Because you get a new one for every Map call, they need to be rolled-up in a matching Reduce.

  long histogram[];
  double sumError;
  void map( ... ) {
    histogram = new long[maxHisto]; // New private histogram[] object

(4) Break out the Big Data (inputs or outputs).  This will be passed to a doAll() call, and every Chunk of the Big Data will get a private cloned instance of the Calc object, distributed across the cluster:

new Calc(mean,vec.max()).doAll(myBigVector /*or Frame or Vec[] or ....*/);

(5) Implement your Map.  Here we show a Batching-Map, which typically does a plain “for” loop over all the elements in the Chunk.  Access your data with the “at0” (Chunk-relative addressing) call – which is the fastest accessor but requires a chunk (and takes an “int” index).  Plain Vec-relative addressing is a little slower, but takes a full “long” index: “ idx)”.

  void map( Chunk chk ) {
     histogram = new long[maxHisto];
     for( int i=0; i&lt;chk.len; i++ ) {
       double err = chk.at0(i)-mean;
       sumError += err*err;

(6) Implement a Reduce for any output fields.  Note that Reduce has a “type B” in the “this” pointer, and is passed a 2nd one:

  void reduce( Calc that ) {
     sumError += that.sumError;
     // Add the array elements with a simple for-loop... we use this
     // simple utility.
     histogram = ArrayUtils.add(histogram,that.histogram);

(7) That’s it!  Results are in your original Calc object:

Calc results = new Calc(mean,vec.max()).doAll(myBigVector);
System.out.println(results.sumError+" "+Arrays.toString(results.histogram));

The Rest of the Story

You have to get the data in there – and we’ll import from HDFS, S3, NFS, local disk, or through your browser’s upload.  You can drive data upload from Java, but more typically from R, python, REST/JSON, or Excel.  Same for outputting Big Data results: we’ll write back Big Data to any store, while being driven by any of the above languages. If you build a predictive model, you’ll want to eventually use the model in production.  You can use it in-memory as-is, scoring new datasets on the model – and for example constantly streaming new data through the model while at the same time constant churning out new models to be streamed through.  Or you can get a Java version of any model suitable for dropping into your production environment.

And that’s the end of my whirl-wind tour of the H2O Distributed Computing API.  Hope you like it!

Comments & suggestions welcome.


TCP is UNreliable

Been to long between blogs…

TCP Is Not Reliable” – what’s THAT mean?

Means: I can cause TCP to reliably fail in under 5 mins, on at least 2 different modern Linux variants and on modern hardware, both in our datacenter (no hypervisor) and on EC2.

What does “fail” mean?  Means the client will open a socket to the server, write a bunch of stuff and close the socket – with no errors of any sort.  All standard blocking calls.  The server will get no information of any sort that a connection was attempted.  Let me repeat that: neither client nor server get ANY errors of any kind, the client gets told he opened/wrote/closed a connection, and the server gets no connection attempt, nor any data, nor any errors.  It’s exactly “as if” the client’s open/write/close was thrown in the bit-bucket.

We’d been having these rare failures under heavy load where it was looking like a dropped RPC call.  H2O has it’s own RPC mechanism, built over the RUDP layer (see all the task-tracking code in the H2ONode class).  Integrating the two layers gives a lot of savings in network traffic, most small-data remote calls (e.g. nearly all the control logic) require exactly 1 UDP packet to start the call, and 1 UDP packet with response.  For large-data calls (i.e., moving a 4Meg “chunk” of data between nodes) we use TCP – mostly for it’s flow-control & congestion-control.  Since TCP is also reliable, we bypassed the Reliability part of the RUDP.  If you look in the code, the AutoBuffer class lazily decides between UDP or TCP send styles, based on the amount of data to send.  The TCP stuff used to just open a socket, send the data & close.

So as I was saying, we’d have these rare failures under heavy load that looked like a dropped TCP connection (was hitting the same asserts as dropping a UDP packet, except we had dropped-UDP-packet recovery code in there and working forever).  Finally Kevin, our systems hacker, got a reliable setup (reliably failing?) – it was a H2O parse of a large CSV dataset into a 5-node cluster… then a 4-node cluster, then a 3-node cluster.  I kept adding asserts, and he kept shrinking the test setup, but still nothing seemed obvious – except that obviously during the parse we’d inhale a lot of data, ship it around our 3-node clusters with lots of TCP connections, and then *bang*, an assert would trip about missing some data.

Occam’s Razor dictated we look at the layers below the Java code – the JVM, the native, the OS layers – but these are typically very opaque.  The network packets, however, are easily visible with wireshark tools.  So we logged every packet.  It took another few days of hard work, but Kevin triumphantly presented me with a wireshark log bracketing the Java failure… and there it was in the log: a broken TCP connection.  We stared harder.

In all these failures the common theme is that the receiver is very heavily loaded, with many hundreds of short-lived TCP connections being opened/read/closed every second from many other machines.  The sender sends a ‘SYN’ packet, requesting a connection. The sender (optimistically) sends 1 data packet; optimistic because the receiver has yet to acknowledge the SYN packet.  The receiver, being much overloaded, is very slow.  Eventually the receiver returns a ‘SYN-ACK’ packet, acknowledging both the open and the data packet.  At this point the receiver’s JVM has not been told about the open connection; this work is all opening at the OS layer alone.  The sender, being done, sends a ‘FIN’ which it does NOT wait for acknowledgement (all data has already been acknowledged).  The receiver, being heavily overloaded, eventually times-out internally (probably waiting for the JVM to accept the open-call, and the JVM being overloaded is too slow to get around to it) – and sends a RST (reset) packet back…. wiping out the connection and the data.  The sender, however, has moved on – it already sent a FIN & closed the socket, so the RST is for a closed connection.  Net result: sender sent, but the receiver reset the connection without informing either the JVM process or the sender.

Kevin crawled the Linux kernel code, looking at places where connections get reset.  There are too many to tell which exact path we triggered, but it is *possible* (not confirmed) that Linux decided it was the subject of a DDOS attack and started closing open-but-not-accepted TCP connections.  There are knobs in Linux you can tweak here, and we did – and could make the problem go away, or be much harder to reproduce.

With the bug root-caused in the OS, we started looking our options for fixing the situation.  Asking our clients to either upgrade their kernels, or use kernel-level network tweaks was not in the cards.  We ended up implementing two fixes: (1) we moved the TCP connection parts into the existing Reliability layer built over UDP.  Basically, we have an application-level timeout and acknowledgement for TCP connections, and will retry TCP connections as needed.  With this in place, the H2O crash goes away (although if the code triggers, we log it and use app-level congestion delay logic).  And (2) we multiplex our TCP connections, so the rate of “open TCPs/sec” has dropped to 1 or 2 – and with this 2nd fix in place we never see the first issue.

At this point H2O’s RPC calls are rock-solid, even under extreme loads.


Found this decent article:

  • It’s a well known problem, in that many people trip over it, and get confused by it
  • The recommended solution is app-level protocol changes (send expected length with data, receiver sends back app-level ACK after reading all expected data). This is frequently not possible (i.e., legacy receiver).
  • Note that setting flags like SO_LINGER are not sufficient
  • There is a Linux-specific workaround (SIOCOUTQ)
  • The “Principle of Least Surprise” is violated: I, at least, am surprised when ‘write / close’ does not block on the ‘close’ until the kernel at the other end promises it can deliver the data to the app.  Probably the remote kernel would need to block the ‘close’ on this side until all the data has been moved into the user-space on that side – which might in turn be blocked by the receiver app’s slow read rate.