Sparkling Water: H2O + Scala + Spark

Spark is an up and coming new big data technology; it’s a whole lot faster and easier than existing Hadoop-based solutions. H2O does state-of-the-art Machine Learning algorithms over Big Data – and does them Fast. We are happy to announce that H2O now has a basic integration with Spark – Sparkling Water!

This is a “deep” integration – H2O can convert Spark RDDs to H2O DataFrames and vice-versa directly. The conversion is fully in-memory and in-process, and distributed and parallel as well. This also means H2O has a (nearly) full integration with Scala as well – H2O DataFrames are full Scala Collection objects – and the basic for each collection calls naturally run distributed and parallel also.

A few months back we announced the start of this initiative: Sparkling Water = H2O + Spark

In that post we went for the quickest integration solution possible: separate processes for Spark and H2O, with a Tachyon bridge. While this worked, it required fully 3 copies of all data: a Spark copy, a Tachyon bridge copy, and an H2O copy – and the data passed through process boundaries repeatedly. With this work we need only the Spark and H2O copies, the data is only copied once,and does not cross a process boundary. It’s both faster and uses less memory.

Scala integration is being showcased before Spark integration simply because we need to understand H2O’s Scala integration before we can understand H2O’s Spark integration.

H2O’s Scala API

As always, the code is open source and available on github. This code is all in 0xdata’s “h2o-dev” repro – a clean-slate dev friendly version of H2O. As of this writing, h2o-dev has all the core h2o logic, and about 50% of the machine learning algos (and the rest should appear in a month).

The basic Scala wrappers are here:

And some simple examples here:

The idea is really simple: a Scala DataFrame extends an H2O water.fvec.Frame object.

class DataFrame private ( key : Key, names : Array[String], vecs : Array[Vec] ) extends Frame(key,names,vecs) with Map[Long,Array[Option[Any]]] {

The Scala type is a Map[Long, Array[Option[Any]]] – a map from Long row numbers to rows of data – i.e. a single data observation. The observation is presented as an Array of Option[Any] – array elements are features in the observation. H2O fully supports the notion of missing elements or data – and this is presented as a Scala Option type. Once you know a value is not“missing” – what exactly is it’s type? H2O Vecs (columns) are typed to hold one of these types:- a Number (Java double conceptually, but any of boolean, byte, char, int, float, long, double)– a String– a Factor/Enum (similar to interned Strings that are mapped to small dense integer values)– a Timestamp (stored internally as milliseconds since the Unix Epoch)– a UUID

A DataFrame can be made from a Spark RDD or an H2O Frame directly or from a CSV file:

val df1 = new DataFrame(new File("some.csv"))

Column subsets (a smaller dataframe) can be selected via the column name:

val df2 : DataFrame = df2('a_column_header,'age,'id)

You can do a basic foreach:

df2.foreach( case(x,age,id) => ...use x and age and id... )

And this will run distributed and parallel across your cluster!

The code for foreach is very simple:

override def foreach[U](f: ((Long, T)) => U): Unit = {
  new MRTask {
    override def map( chks : Array[Chunk] ) = {
      val start = chks(0)._start
      val row = new T(chks.length)
      val len = chks(0).len
      (0 until len).foreach{ i =>
        (0 until chks.length).foreach{ col => row(col) = if( chks(col).isNA0(i) ) None else Some(chks(col).at0(i)) }

More complicated examples showing e.g. a scala map/reduce paradigm will follow later.

WaterWorks – Flowing Data between Spark and H2O

Let’s turn to Spark now, and show H2O and Spark working together. Spark RDD’s and H2O Frames are similar – but not equal – ways to manage large distributed data. Basically RDDs are a collection of blocks (Partitions) of rows, and Frames are a collection of columns, aligned in rows and broken up in Chunks.Sort of a blocks-vs-stripes thing.

H2O represents data in columns (Vecs in H2O lingo) vertically striped across the cluster; a Frame is a collection of columns/Vecs. Vecs, in turn,are broken up into Chunks – and the Chunks are carefully aligned in the JVM heaps such that whole rows of data (all the columns in a Frame) align.

Spark represents data in RDDs (Resilent Distributed Dataset) -a Collection of elements. Each RDD in broken up in turn into Partitions blocked across the cluster; each Partition holds a subset of the Collection as complete rows (or observations in data-science parlance).

H2O Frames, Vecs and Chunks can represent any primitive Numeric type, Strings, timestamps, and UUIDs – and support the notion of a missing value. RDDs ca nrepresent a collection of any Scala or Java object. If we limit the objects to containing fields that H2O represents then we can build a simple mapping between the two formats.

For our example we’ll look at a collection of Users:

class User(
  id: Long,
  age: Option[Short],
  sex: Option[Boolean],
  salary: Int

In Scala, this is typed as RDD[User], and we’ll have 1 User object per real User, blocked in Partitions around the cluster. In H2O, DataFrames naturally hold all numeric types and missing values in a column format; we’ll have 4columns each holding a value for each user again distributed in Chunks around the cluster. Here’s a single Partition hold 4 users, and showing 4 Chunks (one per column):


Here’s 4 JVM heaps holding all 32 users. There are 8 partitions (two per JVM)and 32 Chunks in 4 Vecs striped around the cluster:


An Example: Back and Forth between H2O and Spark

Here’s a simple Scala example; we’ll assume we have the 4-node Sparkling Water cluster above and a CSV (or Hive) file on disk. We’ll use a simple User data file featuring just a unique id, the user’s age, sex and salary. From just the age and sex we’ll try to predict the typical salary using DeepLearning.

First we load the CSV file into H2O. H2O features a fast robust CSV parser which is able to ingest CSV data at full disk bandwidths across a cluster. We can handle a wide variety of CSV formats (e.g. Hive is a CSV with ^A/0x01 field separators) and typically automatically infer all column types. Unlike Spark,operations in H2O are eager not lazy, so the file is loaded and parsed immediately:

// Load an H2O DataFrame from CSV file
val frameFromCSV = new DataFrame(new File("h2o-examples/smalldata/small_users.csv"))

Then we’ll convert to a Spark RDD – like all RDD operations, this conversion islazy; the conversion is defined but not executed yet:

val sc = createSparkContext()
val table : RDD[User] = H2OContext.toRDD(cs,frameFromCSV)

And then run an SQL query! SQL is a great way to subselect and munge data, and is one of Spark’s many strengths.

table.registerTempTable("user_table") // The RDD is now an SQL table
val query = "SELECT * FROM user_table WHERE AGE>=18" // Ignore underage users
val result = sql(query) // Run SQL query

Then we’ll convert back to an H2O DataFrame – and run H2O’s Deep Learning(NeuralNets) on the result. Since H2O operations are eager, this conversion triggers the actual SQL query to run just before converting.

// Convert back to H2O DataFrame
val frameFromQuery = H2OContext.toDataFrame(sc,result)

Now it’s time for some Deep Learning. We don’t want to train a Deep Learning model using the ID field – since it is unique per user, its possible that the model learns to spot interesting user features from the ID alone – which obviously does not work on the next new user that comes along! This problem is calledoverfittingand H2O’s algorithms all have robust features to avoid it – but in this case we’ll just cut out the id column by selecting a subset of features to train on by name:

val trainFrame = frameFromQuery('age,'sex,'salary)

Deep Learning has about 60 parameters, but here we’ll limit ourselves to setting just a few and take all the defaults. Also note that sometimes the age or sex information is missing; this is a common problem in Data Science – you can see it in the example data. DataFrames naturally support this notion of missing data. However each of the Machine Learning algorithms in H2O handle missing data differently – the exact behavior depends on the deep mathematics in the algorithms. See this video for an excellent discussion on H2O’s DeepLearning.

// Deep Learning!
// – configure parameters
val dlParams = new DeepLearningParameters()
dlParams.source = trainFrame 
dlParams.response = trainFrame(‘salary)
// Run Deep Learning. The train() call starts the learning process
// and returns a Future; the training runs in the background. Calling
// get() blocks until the training completes.
val dlModel = new DeepLearning(dlParams).train.get

At this point we have a Deep Learning model that we can use to make predictions about new users – who have not shared their salary information with us. Those predictions are another H2O DataFrame and can be moved back and forth between H2O and Spark as any other DataFrame. Spark and Scala logic can be used to drive actions from the predictions, or just used in further in a deep RDD and H2O pipeline.


In just a few lines of Scala code we now blend H2O’s Machine Learning and Spark’s data-munging and SQL to form a complete ML solution:

  • Ingest data, via CSV, existing Spark RDD or existing H2O Frame
  • Manipulate the data, via Spark RDDs or as a Scala Collection
  • Model in H2O with cutting-edge algorithms
  • Use the Model to make Predictions
  • Use the Predictions in Spark RDDs and Scala to solve our problems

In the months ahead we will be rounding out this integration story – turning brand-new functionality into a production-ready solution.

I hope you like H2O’s new Spark and Scala integration – and always we are very excited to hear your feedback – and especially code to help further this good cause!  If you would like to contribute – please download the code from Git and submit your pull requests – or post suggestions and questions on h2ostream.


A K/V Store For In-Memory Analytics, Part 2

This is a continuation of a prior blog on the H2O K/V Store, Part 1.

A quick review on key bits going into this next blog:

  • H2O supports a very high performance in-memory Distributed K/V store
  • The store honors the full Java Memory Model with exact consistency by default
  • Keys can be cached locally for both reads & writes
  • A typical cached Key Get or Put takes 150ns
  • A cache-missing operation requires network traffic; 1 or 2 UDP packets, or a TCP connection for large payloads.
  • Single-key transactional updates are supported
  • The Keys are pseudo-randomly distributed, removing a “hot-blocks” problem
  • There is no “master” Node; the cluster is fully peer-to-peer
  • Individual Keys have a “Home Node”, responsible for ordering conflicting writes to the same Key; the Home Node varies from Key to Key
  • Network operations are built on top of a reliable RPC mechanism built into H2O

The code for all of this can be found in the H2O GIT repo:

The Distributed K/V Store (hereafter, the DKV store), is similar to the hardware cache-coherency protocols MESI, but altered to better match the software layer.  In this hardware analogy, a Key is akin to an address in memory, a Value’s payload is akin to the data stored at that address – and the Value itself is more like the hardware cache coherency bits.  These bits are not directly visible to the X86 programmer, but they have a profound impact on the (apparent) speed of memory, i.e. the cache behavior.  As is typical for this kind of thing, the actual implementation is basically a distributed Finite State Machine.

Keys have a “Home Node” (pseudo-randomly assigned – the Home Node is different for each Key).  Keys keep a little bit of state cached (mostly Home Node and a proper hash) but do not directly track coherency – that honor is given to the Values.

On the Home Node then, a Value tracks which other Nodes have replicas of this Value, and is responsible for invalidating those replicas when the Key is updated to a new Value.  Values also are used to force ordering of writes to the same Key from the same non-home writer – 1 JVM can only have a single outstanding write to the same Key in flight at once (although there’s no limit to the number of pending writes to unrelated Keys).  Hence parallel writes to unrelated Keys all work in parallel – the limit being the network bandwidth instead of the network latency.

Each Value holds a single AtomicInteger for a read/writer-lock, and a NonBlockingSetInt (really just a fast concurrent bitset), and a pointer to the user’s payload.  The R/W lock holds the count of pending Gets in-flight, or -1 if the Value is write-locked.  The pending-Gets count is typically zero, only going up when a new cached replica is being set in some Node; once the remote Node has the replica (completes it’s Get) the pending-Gets count will fall again.  The bitset holds the set of Nodes replicating this Value.  Gets are implemented in, and are an instance of H2O’s reliable RPC mechanism in

On a non-home node, the Value’s R/W lock field only holds a flag indicating that the Value is being overridden by a fresh local write, or not.

For a running example, lets assume a 4-node cluster.  The cluster members are called A,B,C, and D.  We have a single Key, “K1” and start with a single Value “V1”.  When writing a Value, we’ll follow with the count in the R/W lock, and a list of Nodes in the replica list.  Example: “V1[2,A]” means Value V1 has 2 pending-Gets, and is replicated on Node A.

A quiescent Value, with no Gets-in-flight and no replicas:


Actually, this is a 4-node cluster with K1 & V1 on Node A and not on the other nodes.  Hence a better picture is:


Node B has K1 and does a Get.  This misses in B‘s local K/V store (see, so B does a remote Get (  K1’s hash (available directly from K1 without talking to any other cluster member) tells us that K1’s home is Node A.  Hence B fires off a TaskGetKey from B to A requesting a replica of K1.  Here’s the flow:


Note that once Node B receives V1, the thread in B waiting for it can immediately begin processing V1.  The ACKACK is sent back asynchronously.  The request for K1 fits in a UDP packet.  Assuming reply V1 does as well, B only has to wait for a round-trip UDP packet send & receive before getting his data.  Assuming Nodes C & D make the same request:


At this point any Node in the cluster can retrieve V1 from K1 at no more cost than a hashtable lookup.

What happens when C wants to write a new Value V2 to Key K1?  C writes V2 locally, then sends V2 to AA then resolves any conflicting writes (none here), and invalidates the copies B & D have.  Future reads by B & D will then reload the updated value from A.  Finally, C gets notification that his write has completed.  Here’s the flow:


Let’s work through this diagram a little.

  • Node C writes V2 locally.  This means parallel threads within C can see the update immediately.  Also the writing thread can choose to do either a volatile write or not.  For a volatile write, the writing thread will block until it receives an ACK from Node A that the invalidates are complete.  For non-volatile writes, this is a fire-and-forget operation, although a future unrelated volatile write might need to block until C receives the ACK from A.
  • Node C writes V2 with an “X” in the R/W-lock – a flag indicating that the write of V2 is in-progress.  Node C is not allowed to start ANOTHER write to K1 until this write completes; further writes (of K1 from C) will block.  Hence Node A does not have to handle UDP out-of-order writes from C.  Writes to other Keys are not blocked.
  • Unlike a remote Get, the payload goes out in the initial send from C to A; the payload size determines if this is via UDP or TCP.  The ACKs are all small, and always go via UDP.
  • Value V1 in Node A is tracking the replicas, very akin to what a hardware cache mechanism would do.  Upon receiving the write from C several things happen in short order (but distinctly spaced in time): (1) A single atomic replace operation in A‘s local K/V store dictates which of several possible racing writes “wins” (at this point in time racing unrelated readers in A can see either V1 or V2; both are valid responses since the readers are racing).  (2) V1 is locked against future reads (at this point in time, any racing readers finding V1 will fail and upon retrying find V2); locking is done by setting the R/W-lock to -1.  (3) V2’s replicas are set to the sender C (who must have had a copy to send it!) and the receiver A, and these Nodes are removed from V1’s replica list.
  • Now A can start invalidate replicas around the cluster.  Note that if there are none, this operation takes no time!  Otherwise A sends out invalidate commands (to B & D here) and waits for the response…. thus causing C to wait until the cluster-wide invalidates are done.
  • As soon as the last invalidate is ACK’d back to A, A will ACK back to C.
  • The ACKACKs here can happen in any order and are not required for proper coherency & ordering; ACKACKs are used by the reliable RPC mechanism to know that the ACK made it.

Let’s talk a little about the performance here.  What operations are fast, or slow – and why?

  • All Things Cache Locally.  In the absence of conflicts, everything caches locally and no network traffic is required to make forward progress.  Cached Readers take about 150ns a pop.  Cached writes can all go fire-and-forget style (bound by network bandwidth and not latency), and a Reader following a Writer goes fast, and “sees” the write.
  • Readers Never Block.  If you look in the above flow, the only time a Reader blocks is on a cache-miss: the reader simply has to wait for the network to deliver the bytes.  Note that a Reader who misses locally and is blocked on the Home Node delivering a result, does NOT in turn wait for the Home Node to finish any other network operation.  In particular, the Home Node can deliver a valid result even while Invalidates are in-flight.
  • Unrelated Writes do not wait on each other.  Parallel unrelated writes go as fast as the network allows (bandwidth, not latency).
  • Writes can (rarely) Block on Readers.  Taking the write-lock requires all pending-Gets to complete.  Max of 1 pending-Get per Node, and they all complete in parallel – so this cannot take longer than the time to send the old Value around the cluster.
  • Back-to-back Writes to the same key Wait.  Each Node can only have 1 pending write to the same Key at a time.  Back-to-back updates will block until the first one fully completes.
  • Distributed writes to the same key are ordered, by the Home Node’s atomic replace operation.  However, they will typically be handled very fast after the first one – once the invalidates are done, a series of Puts will be ordered by the replace operation as fast as the memory system allows, and the ACKACKs can complete in any order (again, network bandwidth not latency).

These common use cases do not block, and go at either network speeds or memory speeds:

  • Parallel Reading Local Big Data.  Cached locally as-needed, reads at memory speeds
  • Parallel Reading Remote Big Data.  Uses full network bandwidth, and then cached thereafter
  • Parallel Writing Local Big Data.  Streaming writes use full memory bandwidth, no network
  • Parallel Writing Remote Big Data.  Streaming writes use full network bandwidth, not any round-trip latency

We have yet to talk about:

  • Resolving racing writes
  • resolving racing readers & writers
  • Slow network reordering events (reliable RPC turns unreliable network into slow network w/reordering)

But… I think I’m out of space!

Comments welcome as always,


H2O Architecture

H2O Architecture

This is a top-level overview of the H2O architecture.  H2O does in-memory analytics on clusters with distributed parallelized state-of-the-art Machine Learning algorithms.  However, the platform is very generic, and very very fast.  We’re building Machine Learning tools with it, because we think they’re cool and interesting, but the platform can do much more.

H2O is based on a number of layers, and is coded to at different layers to best approach different tasks and objectives.  We talk about the MapReduce execution flavor alot in public, because it’s easy to explain, because it covers a lot of ground, and because we’re implemented a bunch of dense linear-algebra style algorithms with it – but that’s not the only thing we can do with H2O, nor is it the only coding “style”.  Here’s a rundown of the layers in H2O:

In-memory distributed K/V store layer: H2O sports an in-memory (not-persistent) K/V store, with *exact* (not lazy) consistency semantics and transactions.  The memory model is exactly the Java Memory Model, but distributed.  Both reads and writes are fully cacheable, and typical cache-hit latencies are around 150ns (that’s 150 nanoseconds) from a NonBlockingHashMap.  Let me repeat that: reads and writes go through a non-blocking hash table – we do NOT suffer from a classic hot-blocks problem.  Cache-misses obviously require a network hop, and the execution times are totally driven by the size of data moved divided by available bandwidth… and of course the results are cached.  The K/V store is currently used hold control state, all results, and the Big Data itself.  You can certainly build a dandy graph-based algorithm directly over the K/V store, or integrate H2O’s model scoring into a low-latency production pipeline.   

      * A note on cluster size:  H2O clusters are peer-to-peer, and have no upper bound on the size.  We have built clusters with over a 100 nodes and multiple Tb’s of DRAM.  Every node “knows” where every Key is, without actually having to hold onto the Key.  Hence asking for a particular Key’s Value is no more expensive than 1 network hop from the Key requester to some Key holder, and back.

     * A note on compression Big Data is heavily (and losslessly) compressed – typically 2x to 4x better than GZIP on disk (YMMV), and can be accessed like a Java Array (a Giant greater-than-4billion-element distributed Java array).  H2O guarantees that if the data is accessed linearly then the access time will match what you can get out of C or Fortran – i.e., be memory bandwidth bound, not CPU bound.  You can access the array (for both reads and writes) in any order, of course, but you get strong speed guarantees for accessing in-order.  You can do pretty much anything to an H2O array that you can do with a Java array, although due to size/scale you’ll probably want to access the array in a blatantly parallel style.

      * A note on decompression: The data is decompressed Just-In-Time strictly in CPU registers in the hot inner loops – and THIS IS FASTER than decompressing beforehand because most algorithms are memory bandwidth bound.  Moving a 32byte cached line of compressed data into CPU registers gets more data per-cache-miss than moving 4 8-byte doubles.  Decompression typically takes 2-4 instructions of shift/scale/add per element, and is well covered by the cache-miss costs.  As an example, adding a billion compressed doubles (e.g., to compute the average) takes 12ms on a 32-core (dual-socket) E5-2670 Intel – an achieved bandwidth of over 600Gb/sec – hugely faster than all other Big Data solutions.  Accounting for compression, H2O achieved 83Gb/sec out of the theoretical maximum of 100Gb/sec on this hardware.

      * A note on Big Data and GC: H2O keeps all our data in heap, but in large arrays of Java primitives.  Our experience shows that we run without GC issues, even on very large heaps with the default collector.  We routinely test with e.g. heaps from 2G to 200G – and never see FullGC costs exceed a few seconds every now and then (depends on the rate of Big Data writing going on).  The normal Java object allocation used to drive the system internally has a negligible GC load.  We keep our data in-heap because its as fast as possible (memory bandwidth limited), and easy to code (pure Java), and has no interesting GC costs.  Our GC tuning policy is: “only use the -Xmx flag, set to the largest you can allow given the machine resources”.  Take all the other GC defaults, they will work fine.

      * A note on Bigger Data and GCWe do a user-mode swap-to-disk when the Java heap gets too full, i.e., you’re using more Big Data than physical DRAM.  We won’t die with a GC death-spiral, but we will degrade to out-of-core speeds.  We’ll go as fast as the disk will allow.  I’ve personally tested loading a 12Gb dataset into a 2Gb (32bit) JVM; it took about 5 minutes to load the data, and another 5 minutes to run a Logistic Regression.

      * A note on data ingestWe read data fully parallelized from S3, HDFS, NFS, URI’s, browser uploads, etc.  We can typically drive HDFS disk spindles to an interesting fraction of what you can get from e.g. HDFS file-copy.  We parse & compress (in parallel) a very generous notion of a CSV file (for instance, Hive files are directly ingestable), and SVM light files.  We are planning on an RDD ingester – interactivity with other frameworks is in everybody’s interest.

      * A note on sparse data: H2O sports about 15 different compression schemes under the hood, including ones designed to compress sparse data.  We happily import SVMLight without ever having the data “blow up” and still fully supporting the array-access API, including speed guarantees.

      * A note on missing data: Most data sets have missing elements, and most math algorithms deal with missing data specially.  H2O fully supports a notion of “NA” for all data, including setting, testing, selecting in (or out), etc, and this notion is woven through the data presentation layer.

    * A note on streaming data:H2O vectors can have data inserted & removed (anywhere, in any order) continuously.  In particular, it’s easy to add new data at the end and remove it from the start – i.e., a build a large rolling dataset holding all the elements that fit given a memory budget and a data flow-rate.  This has been on our roadmap for awhile, and needs only a little more work to be fully functional.

 Light-weight Map/Reduce layer: Map/Reduce is a nice way to write blatantly parallel code (although not the only way), and we support a particularly fast and efficient flavor.  A Map maps Type A to Type B, and a Reduce combines two Type B’s into one Type B.  Both Types A & B can be a combination of small-data (described as a Plain Old Java Object, a POJO) and big-data (described as another Giant H2O distributed array).  Here’s an example map from a Type A (a pair of columns), to a Type B (a POJO of Class MyMR holding various sums):
   new MyMR<MRTask> extends MRTask {
     double sum0, sum1, sq_sum0;           // Most things are allowed here
     @Override public void map( double d0, double d1 ) { 
       sum0 += d0;  sum1 += d1;  sq_sum0 += d0*d0;  // Again most any Java code here
     @Override public void reduce( MyMR my ) {   // Combine two MyMRs together
       sum0 += my.sum0; sum1 += my.sum1; sq_sum0 += my.sq_sum0;
   }.doAll( Vec v0, Vec v1 );  // Invoke in-parallel distributed

This code will be distributed ’round the cluster, and run at memory-bandwidth speeds (on compressed data!) with no further ado.  There’s a lot of mileage possible here that I’m only touching lightly on.  Filtering, subseting, writing results into temp arrays that are used on later next passes; uniques on billions of rows, ddply-style group-by operations all work in this Map/Reduce framework – and all work by writing plain old Java.

     * A note Group-By: We have a GroupBy operator running at scale (called ddply in the R community), built using this Map/Reduce framework.  The GroupBy can handle millions of groups on billions of rows, and runs Map/Reduce tasks on the group members (so a few large groups runs as fast as many small groups).

     * Scala, and a note on API cleanliness: We fully acknowledge Java’s weaknesses here – this is the Java6 flavor coding style; Java7 style is nicer – but still not as nice as some other languages.  We fully embrace & support alternative syntax(s) over our engine.  In particular, we have an engineer working on a in-process Scala (amongst other) interfaces.  We are shifting our focus now, from the excellent backend to the API interface side of things.  This is a work-in-progress for us, and we are looking forward to much improvement over the next year.

     * A note on R: H2O supports an R-like language – not full R semantics – but the obviously data-parallel data-munging aspects of R, and of course all the operators run fully parallel and distributed.  There is a REPL.  You can use it to add or drop columns or rows, manufacture features, impute missing values, or drop-in many R-expressions and have them run at-scale.

 Pre-Baked Algorithms LayerWe have the following algorithms pre-baked, fully optimized and full-featured: Generalized Linear Modeling, including Logistic Binomial Regression plus Gaussian, Gamma, Poisson, and Tweedie distributions; Deep LearningRandom Forest (that scales *out* to all the data in the cluster);  Gradient Boosted Machine that provides both multinomial classification and regression boosted models (again, in-parallel and fully distributed); PCAKMeans (and variants);  and a full suite of data characterization – for example Quantiles (any quantile, computed exactly in milliseconds).  All these algorithms support Confusion Matrices (with adjustable thresholds), AUC & ROC metrics, incremental test data-set results on partially trained models during the build process.  Within each algorithm, we support a full range of tuning parameters and options that you’d find in the similar R or SAS package.

* A note on some Mahout algorithms: We’re clearly well suited to e.g. SSVD and Co-occurrence and have talked with Ted Dunning at length on how they would be implemented in H2O.

 REST/ JSON/ R/ python/ Excel/ REPL: The system is externally drivable via URLs/REST API calls, with JSON responses.  We use REST/JSON from Python to drive all our testing harness.  We have a very nice R package with H2O integrated behind R – you can issue R commands to an H2O-backed R “data.frame” – and have all the Big Math work on the Big Data in a cluster – including 90% of the typical “data munging” workflow.  This same REST/JSON interface also works with e.g. Excel (yes we have a demo) or shell scripts.  We have a pretty web-GUI over the REST/JSON layer, that is suitable for lightweight modeling tasks.


A K/V Store For In-Memory Analytics: Part 1

First, some non-technical madness; fun technical bits below:

Jan 22: My life in the past 2 hours…
Rush to South San Jose from office team photo op –
Get my Kid L (age 16) from high school and rush her to to dentist in mid-Campbell: needs a bunch of cavities filled.
While sitting in the dentist’s office, Kid M (also mine, age 14) calls from Mom’s car – needs a book from Mom’s house; he’s staying away from J (18; 3rd kid) who is quarantined with a 103 degree fever. June (GF) calls – has a prescription for her rash – and a cause figured out finally; K (age 20, 4th kid) calls – she has J’s flu but isn’t feeling so bad, so she’s going to class in Berkeley. June wants to hear how K’s doing so I call June back.
M calls again – needs another book.
M calls a 3rd time – and this time Mom is explaining to she’s figured out what M is up too (and is apologizing for letting him call me so much). June calls – she has a flat in San Mateo, and has never dealt with a flat tire before in her life. I tell her to call AAA since I got kids & need to be home to cook dinner.  She canceled AAA last month.
I’m trying to ask her “how flat is flat”, but if June is noticing it, it’s pretty bad. Some stranger in the background is saying “no you can’t drive it to a station, you’ll ruin the rim” (so yeah, really bad). Two minutes later same stranger is offering to change her spare.
June calls back in 10mins; she has a spare, has standard Toyota gear for tire changing, but one of the lug nuts is rounded off & stuck.  10mins more the stranger gets the nut off, and I’m trying to find a place for June to take her car & get the tire repaired, while sitting in the dentist’s office.
In 30mins L comes out of the DDS chair with a numb mouth & fewer cavities,
then we get to run to Mom’s house & get her & M’s books & guitar, then home so I can cook dinner.
I think I actually wrote some code in the middle of all that, but I’m not really sure.

Feb 4: I’m pondering how to “grow” an internal engineer to work on our internal R-like programming language.  I figure we need a short intro-language to dev/make to get the basic language & tool-building mindset across.  I recall a Byte magazine article about a language called “Mouse”.  Google & the Internet come through: there’s a Wiki article on Mouse

and there at the bottom is the link to the Byte magazine, now fully scanned:

Scary stuff: 35 years later and I can fully recall some obscure programming language, down to all kinds of implementation details.  And ads for “fast 4K static RAMs” and “fully loaded TRS-80’s”.  But not my neighbor of 15 years kids’ names.


A K/V Store for In-Memory Analytics is building in-memory analytics (no surprise, see  What may be a surprise, though, is that there’s a full-fledged high-performance Key/Value store built into H2O and that is central to both our data management and our control logic.

We use the K/V store in two main ways:

  • All the Big Data is stored striped across the cluster with pseudo-random Keys.  For any given Key we can find the node which “homes” it from the Key itself, and cache it locally as needed.  Pseudo-random striping in practice means the data is typically well distributed around the cluster.  Caching (as opposed to a fixed placement) means we do not need to be perfect about which CPU works on which chunk of data – as long as *most* of the work is CPU-local.
  • About 1/2 the control logic goes through the K/V store (and the other half uses the RPC mechanism).  Anything which needs cross-node visibility and persistence uses the store, including progress counters, meta-data for temporary distributed vectors, built models, loaded datasets, and results from all kinds of work we wish to cache to avoid doing it again later.


First some simple facts:

  • The H2O K/V store supports the full Java Memory Model – lazy consistency can be asked for in places but typically only increases performance under certain use-cases (a mix of high volume reads & writes to the same keys).  Note that the JMM does not order writes to non-volatile variables, so high volume writes to unrelated Keys (as is common on temp data in the middle of some big calculation) all runs in parallel with both the network traffic and the actual Value production.  i.e., we don’t typically stall-on-write on any common use-case, and we still maintain exact JMM semantics.
  • All Keys can be cached locally – meaning a typical hot Key ‘get’ is cached, and costs no more than a hashtable ‘get’ – about 150ns.  Same for a ‘put’ – the write is cached locally, then forwarded to another node (AFAIK, this is the fastest K/V get/put on the planet, but see below about not persistent).  The forwarding happens in the background by default (unless you’ve specified volatile-like behavior).  Meanwhile, local readers will see the write immediately, and the writer is stalled for no more time than the hashtable ‘put’ and a single UDP-packet send.
  • H2O also supports transactional K/V updates – the transaction function is forwarded to the Key’s home, where it is run in a loop until the transaction succeeds or is aborted.  We often use this for e.g. asynchronous updates to progress bars.
  • The H2O Cloud is peer-to-peer.  There is no “name-node” nor central Key dictionary.  Each Key has a home-node, but the homes are picked pseudo-randomly per-key.
  • H2O’s store is not persistent, nor is it an HA solution.  We looked at those (and even at one point had a fully functioning Key auto-replicate & repair) and decided that the market was well served by existing technologies.  We opted to put our energies into the Math side of things.


Some Big Picture Details:

H2O’s K/V store is a classic peer-to-peer Distributed Hash Table, with the Keys distributed around the cluster via a psuedo-random hash function.  Pseudo-random because we can (and frequently do) muck with it, to force Keys to ‘home’ to different nodes (usually for load-balance reasons).  A Key’s ‘home’ is solely responsible for breaking ties in racing writes and is the “source of truth” for that Key.  To repeat: Keys can be cached anywhere, and both reads & writes can be cached (although a write is not complete until it reaches ‘home’, gets ordered with other writes, and an ACK is returned).  The ‘home’ is only consulted when breaking ties on conflicting writes or to fetch the value on a Key miss.

Keys are not much more than a blob of unique bytes, often char arrays from Strings or random UUID’s.

Values hold bits for maintaining consistency, plus status bits for being on some backing store (for the user-mode swap-to-disk), plus a big blob of bytes.  The blob of bytes is typically a serialized POJO, and if so aninflated copy of the POJO is kept around also.  We use generics to auto-inflate the POJO form:

MyPOJO pojo = DKV.get(key).get();

This code will set the variable “pojo” to a POJO pulled from the K/V store.  If all caches hit, this will take about 150ns.  There is a lot of raw compressed data (the Big Data part of big data), so Big Data is read directly from the bytes and its “POJO” form is the self byte array – i.e., we don’t keep two copies of Big Data (both a serialized and deserialized form).

Inflated POJOs (and their backing Values) are “immutable”.  While we cannot enforce this at the Java level, updates to the POJOs will not stick in the K/V store unless a ‘put’ is done.  More on this later, but mostly it turns into a coding style issue: if you need to update a POJO AND make the changes globally visible, you need to do a “DKV.put(key,pojo)” at some point.

The further restriction on POJOs is that they inherit from the class “Iced”.  The bytecode weaver will then inject all the code needed to serialize & deserialize (and a JSON pretty-printer, and a bunch of other code).  This rules out the default Java collections (although we have some equivalents – that can run *distributed*, because the collection can get larger than what a single node can hold).  In practice this hasn’t been an issue.  We serialize Iced & primitives, arrays of Iced & primitives and recursive subclasses of Iced.


Reliable Remote Procedure Call

H2O is a clustered solution which requires network communication to JVMs in unrelated process or machine memory spaces.  That network communication can be fast or slow, or may drop packets & sockets (even TCP can silently fail), and may need to be retried.  We have implemented a reliable RPC mechanism which retries failed communications at the RPC level.  An RPC contains a command (or call) to execute on the remote, plus the call arguments; there is a return value.  Both args & returns may be void, or small or may contain gigabytes of data.

Our mechanism has all the obvious optimizations: message data is compressed in a variaty of ways (because CPU is cheaper than network).  Short messages are sent via 1 or 2 UDP packets; larger message use TCP for congestion control. RPCalls are retried periodically until we get an ACK back; the ACK also contains the call’s return value.  The ACK itself is also retried until the called node gets an ACKACK back (and this ends the cycle of Call/ACK/ACKACK).  We handle all the problems with double-sending of tasks & replies.  The end experience is the client makes a blocking call, sending the ‘this’ POJO over the wire – and gets back a modified ‘this’ POJO with results filled in.

In practice, we can pull cables from a running cluster, and plug them back in, and the cluster will recover; – or drop >50% of all UDP packets and still have the system work (albeit more slowly with lots of retries).


And For Next Time, The Gory Details


Been too long since I’ve blogged, but this blog has become quite long already!  And I think I’m going to need pictures also… so, next time the gory details (and yes, building on the above parts).


Building a Distributed GBM on H2O

At 0xdata we build state-of-the-art distributed algorithms – and recently we embarked on building GBM, and algorithm notorious for being impossible to parallelize much less distribute.  We built the algorithm shown in Elements of Statistical Learning II, Trevor Hastie, Robert Tibshirani, and Jerome Friedman on page 387 (shown at the bottom of this post).  Most of the algorithm is straightforward “small” math, but step 2.b.ii says “Fit a regression tree to the targets….”, i.e. fit a regression tree in the middle of the inner loop, for targets that change with each outer loop.  This is where we decided to distribute/parallelize.

The platform we build on is H2O, and as talked about in the prior blog has an API focused on doing big parallel vector operations – and for GBM (and also Random Forest) we need to do big parallel tree operations.  But not really any tree operation; GBM (and RF) constantly build trees – and the work is always at the leaves of a tree, and is about finding the next best split point for the subset of training data that falls into a particular leaf.

The code can be found on our git:

For GBM, look in:

src/main/java/hex/tree/gbm/ - driver, main loop from ESL2
src/main/java/hex/tree/ - shared with RF, score & build 1 layer
src/main/java/hex/tree/ - collect & split histograms per leaf


The High Level Bits

We do a straightforward mapping from vectors to leaves of a tree: we assign a “leaf id” or “node id” to each data row in a new temp vector.  We also choose split points (based on the last pass) before making a pass over the data –
split-point decisions are made on summaries from the previous pass and are always “small data”, and typically aren’t useful to parallelize.  Splitting a leaf makes it an internal node, and we add 2 new leaves below it.

On each pass through the data, each observation R starts at an internal tree node; we get the node id (nid) from our temp vector.  The nid is a direct mapping into a POJO (Plain Old Java Object) for that node in a tree.  The node contains the split decision (e.g., “if age<45 go left, else go right”).  We move the observation R down the decision tree, left or right as appropriate, into a new leaf and record the new nid for next pass in our temp vector.

We then accumulate enough stats to make split decisions for the new leaves at the end of this pass.  Since this is a regression tree, we collect a histogram for each column – the mean & variance (using the recursive-mean technique).  Later we’ll pick the split point with the least variance, and the prediction if we don’t split will be the computed mean.

We repeat this process of moving a row R left or right, assigning a new nid, and summing into the histogram, for all rows.  At the end of this pass we can split each current leaf of the tree, effectively building the tree one layer in a breadth-first fashion.  We repeat for each layer, and after one tree is built we start in on the next.

Effectively then, we’re entirely parallel & distributed within a single tree level but not across trees (GBM trees have sequential dependencies) nor within levels of the same tree.  This parallel-within-level aspect means there’s an upper bound on tree building speed: the latency to build a single level.  However, since we’re completely parallel within a level we can make a single level faster with more CPUs (or conversely build a tree with more data in the same time).  In practice, we’re the fastest single-node GBM we’ve ever seen and we can usefully scale-out with datasets as small as a few 10’s of megs.


Finding an optimal split point is done by choosing a column/predictor/feature and a identifying the value within that feature to be the point at which the data are split so as to minimize some loss function such as Mean Squared Error (or maximize information gain).  The problem devolves to finding the optimal split value for a particular feature (and taking the max across all features). The optimal split is often found (with small data) by sorting the feature values and inspecting the induced split. For big data, this can be a problem (e.g. a distributed sort required for each tree leaf). We choose to approximate sorting with binning instead.

Binning (i.e. a radix sort) is fast & simple, can be done in the same pass as our other tree work, and can get arbitrarily close to the sorted solution with enough bins.  However, there’s always the question of unequal binning and  outliers that skew the bin limits.  We solve these problems with dynamic binning – we choose bin limits anew at each tree level – based on the split bins’ min and max values discovered during the last pass.

Choosing bin limits anew means that outliers won’t be present at the next bin split, and it means dense bins will split again with much tighter bounds.  i.e. at each split point we “drill in” on the interesting data, and after a logarithmic number of splits we will find the optimal split point.

For example: suppose we are binning into 3 bins (our default is 1024 bins) and the predictor values are:

{-600e3, -10, -1, -0.8, -0.4, 0.1, 0.3, 0.6, 2.0, 5.0, +600e3}

Notice the extreme outliers, and the dense data around -1.0 to 2.0.  Then the initial bin split points are found by interpolation over the min and max to be -400e3 & +400e3 inducing the split:

{-600e3} {-10, -1, -0.8, -0.4, 0.1, 0.3, 0.6, 2.0, 5.0}  {+600e3}

The outliers are removed into their own splits, and we’re left with a dense middle bin.  If on the next round we decide to split the middle bin, we’ll bin over the min & max of that bin, i.e. from -10 to +5 with split points -5 and 0:

{-10} { -1, -0.8, -0.4}  {0.1, 0.3, 0.6, 2.0, 5.0}

In two passes we’re already splitting around the dense region.  A further pass on the 2 fuller bins will yield:

{ -1 } {-0.8} {-0.4}
{0.1, 0.3, 0.6} { 2.0} { 5.0}

We have fine-grained binning around the dense clusters of data.

Multinomial (multi-class) Trees

The algorithm on ESL2, pg 387 directly supports multinomial trees – we end up building a tree-per-class.  These trees can be built in parallel with each other, and the predictive results from each tree are run together in step2 .b.iii. Note that the trees do not have to make the same split decisions, and typically do not especially for extreme minority classes.  Basically, the tree targeting some minority class is free to make decisions that optimize for that class.


For factor or categorical columns, we bin on the categorical levels (up to our bin limits in one pass).  We also use an equality-split instead of a relational-split, e.g.

{if feature==BLUE then/else}

instead of

{if feature < BLUE then/else}

We've been debating splitting on subsets of categories, to get more aggressive splitting when the data have many levels.  i.e., a 30,000 level categorical column will have a hard time exploring all possible categories in isolation without a lot of split levels.

As with all H2O algorithms, GBM can be accessed from our REST+JSON API, from the browser, from R, Python, Scala, Excel & Mathematica.  Enjoy!



An API For Distributed Analytics

There are so many APIs to choose from!

Features of the space:

  • Lots of data – which I’ll qualify as “bigger than 1 machine” and thus needing parallel i.o, parallel memory, & parallel compute – and distributed algorithms.
  • Ease of programming; hide details (but expose when want to).  High level for ease-of-use, but “under the covers” has to be easy to understand- because no tool solves all problems – so expect extensions & frequent 1-off hacks.
  • Speed: In-memory by default, where memory can range from 2G to 2T and beyond.  Data placement is required (do not schlop data about unless needed, move code to data, no disk i/o by default).
  • Speed By Default: the normal/average/typical programming style will be fast.  You can “trip over yourself” and go slow, but normal usage is fast.  Obvious when you’re moving away from “go fast” to “unknown speed”
  • Correct By Default: the normal/average/typical programming style will not be exposed to weird corner cases.  No data-races (unless you ask for them), no weird ordering rules, job-scheduling, nor counting mappers & reducers, no figuring out sharding or data placement, nor other low-level easy-to-get-wrong details.  Resource management simple by design.
  • Accessible for non-expert programmers, scientists, engineers, managers – looking for a tool, not wanting the tool to be more complicated than the problem


Design decisions:

Automatic data placement: It’s a hard problem, and its been hard for a long time – but technology is changing: networks are fast relative to the size of memory.  You can move a significant fraction of memory in a cluster in relatively little time.  “Disk is the new tape”:  we want to do work in-memory for that 1000x speedup over disk, but this requires loading memory into one of many little slices in the cluster – which implies data-placement.  Start with random placement – while it’s never perfect, it’s also rarely “perfectly wrong” – you get consistent decent access patterns. Then add local caching to catch the hot common read blocks, and local caching of hot or streaming writes.  For H2O, this is one of the reasons for our K/V store; we get full JMM semantics, exact consistency, but also raw speed: 150ns for a cache-hitting read or write.  Typically a cache miss is a few msec (1 network hop there and back).

Map/Reduce: It’s a simple paradigm shown to scale well.  Much of Big Data involves some kind of structure (log files, bit/byte streams, up to well organized SQL/Hive/DB files).  Map is very generic, and allows an arbitrary function on a unit of work/data to be easily scaled.  Reduce brings Big down to Small in a logarithmic number of steps.  Together, they handle a lot of problems.  Key to making this work: simplicity in the API.  Map converts an A to a B, and Reduce combines two B’s into one B – for any arbitrary A and B.  No resource management, no counting Map or Reduce slots, no shuffle, no keys.  Just a nice clean Map and Reduce.  For H2O this means: Map reads your data directly (type A) and produces results in a Plain-Olde Java POJO’s (type B) – which is also the Map’s “this” pointer.  Results returned directly in “this”.  “This is Not Your Father’s Map Reduce”

Correct By Default: No multi-threaded access questions.  Synchronization, if needed, is provided already.  No figuring out sharding or data placement; replication (caching) will happen as-needed.  NO resource management, other than Xmx (Java heap size).  Like sync, resource management is notoriously hard to get right so don’t require people to do it.  For 0xdata, this means we use very fine grained parallelism (no Map is too small, Doug Lea Fork/Join), and very fine-grained Reduces (so all Big Data shrinks as rapidly as possible).

Fast By Default: Use of the default Map/Reduce API will produce programs that run in parallel & distributed across the cluster, at memory bandwidth speeds for both reads and writes.  Other clustered / parallel paradigms are available but are not guaranteed to go fast.  The API has a simple obvious design, and all calls have a “cost model” associated with them (these calls are guaranteed fast, these calls are only fast in these situations, these calls will work but may be slow, etc.  For 0xdata, code that accesses any number of columns at once (e.g. a single row) – but independent rows – will run at memory bandwidth speeds.  Same for writing any number of columns, including writing subsets (filtering) on rows.  Reductions will happen every so many Maps in a Log-tree fashion.  All filter results are guaranteed to be strongly ordered as well (despite the distributed & parallel execution).

Easy / Multiple APIs – Not all APIs are for all users!  Java & Map/Reduce are good for Java programmers – but people do math in R, Python and a host of other tools.  For 0xdata, we are a team of language implementers as well as mathematicians and systems’ engineers.  We have implemented a generic REST/JSON API and can drive this API from R, python, bash, and Excel – with the ability to add more clients easily.  From inside the JVM, we can drive the system using Scala, or a simple REPL with an R-like syntax.


Lets get a little more concrete here, and bring out the jargon –

A H2O Data Taxonomy

Primitives – at the bottom level, the data are a Java primitive – be it a byte, char, long or double.  Or at least that’s the presentation.  Under the hood we compress aggressively, often seeing 2x to 4x more compression than the GZIP’d file on disk – and we can do math on this compressed form typically at memory bandwidth speeds (i.e., the work to decompress is hidden in the overhead of pulling the data in from memory).  We also support the notion of “missing data” – a crucial notion for data scientists.  It’s similar to Double.NaN, but for all data types.

A Chunk – The basic unit of parallel work, typically holding  1e3 to 1e6 (compressed) primitives.  This data unit is completely hidden, unless you are writing batch-Map calls (where the batching is for efficiency).  It’s big enough to hide control overheads when launching parallel tasks, and small enough to be the unit of caching.  We promise that Chunks from Vecs being worked on together will be co-located on the same machine.

A Vec – A Distributed Vector.  Just like a Java array, but it can hold more than 2e31 elements – limited only by available memory.  Usually used to hold data of a single conceptual type, such as a person’s age or IP address or name or last-click-time or balance, etc.  This is the main conceptual holder of Big Data, and collections of these will typically make up all your data.  Access will be parallel & distributed, and at memory-bandwidth speeds.

A Frame – A Collection of Vecs.  Pattered after R’s data.frame (it’s worked very well for more than 20 years!).  While Vecs might Big Data (and thus can be expensive to touch all of), Frames are mere pointers to Vecs.  We add & drop columns and reorganize them willy-nilly.  The data munging side of things has a lot of convenience functions here.


The Map/Reduce API

(1) Make a subclass of MRTask2, with POJO Java fields that inherent from Iced, or are primitives, or arrays of either.  Why subclassed from Iced?  Because the bytecode weaver will inject code to do a number of things, including serialization & JSON display, and code & loop optimizations.

class Calc extends MRTask {

(2) Break out the small-data Inputs to the Map, and initialize them in an instance of your subclass.  “Small data” will be replicated across the cluster, and available as read-only copies everywhere.  Inputs need to be read-only as they will be shared on each node in the cluster.  “Small” needs to fit in memory and my example is with doubles, but mega-byte sized data is cheap & commonly done.

  final double mean;   // Read-only, shared, distributed
  final int maxHisto;  // Read-only, shared, distributed
  Calc(double meanX, int maxHisto) { this.mean = meanX;  this.maxHisto = maxHisto; }

(3) Break out the small-data Outputs from your map, and initialize them in the Map call.  Because they are initialized in the Map, they are guaranteed thread-local.  Because you get a new one for every Map call, they need to be rolled-up in a matching Reduce.

  long histogram[];
  double sumError;
  void map( ... ) {
    histogram = new long[maxHisto]; // New private histogram[] object

(4) Break out the Big Data (inputs or outputs).  This will be passed to a doAll() call, and every Chunk of the Big Data will get a private cloned instance of the Calc object, distributed across the cluster:

new Calc(mean,vec.max()).doAll(myBigVector /*or Frame or Vec[] or ....*/);

(5) Implement your Map.  Here we show a Batching-Map, which typically does a plain “for” loop over all the elements in the Chunk.  Access your data with the “at0” (Chunk-relative addressing) call – which is the fastest accessor but requires a chunk (and takes an “int” index).  Plain Vec-relative addressing is a little slower, but takes a full “long” index: “ idx)”.

  void map( Chunk chk ) {
     histogram = new long[maxHisto];
     for( int i=0; i&lt;chk.len; i++ ) {
       double err = chk.at0(i)-mean;
       sumError += err*err;

(6) Implement a Reduce for any output fields.  Note that Reduce has a “type B” in the “this” pointer, and is passed a 2nd one:

  void reduce( Calc that ) {
     sumError += that.sumError;
     // Add the array elements with a simple for-loop... we use this
     // simple utility.
     histogram = ArrayUtils.add(histogram,that.histogram);

(7) That’s it!  Results are in your original Calc object:

Calc results = new Calc(mean,vec.max()).doAll(myBigVector);
System.out.println(results.sumError+" "+Arrays.toString(results.histogram));

The Rest of the Story

You have to get the data in there – and we’ll import from HDFS, S3, NFS, local disk, or through your browser’s upload.  You can drive data upload from Java, but more typically from R, python, REST/JSON, or Excel.  Same for outputting Big Data results: we’ll write back Big Data to any store, while being driven by any of the above languages. If you build a predictive model, you’ll want to eventually use the model in production.  You can use it in-memory as-is, scoring new datasets on the model – and for example constantly streaming new data through the model while at the same time constant churning out new models to be streamed through.  Or you can get a Java version of any model suitable for dropping into your production environment.

And that’s the end of my whirl-wind tour of the H2O Distributed Computing API.  Hope you like it!

Comments & suggestions welcome.


TCP is UNreliable

Been to long between blogs…

TCP Is Not Reliable” – what’s THAT mean?

Means: I can cause TCP to reliably fail in under 5 mins, on at least 2 different modern Linux variants and on modern hardware, both in our datacenter (no hypervisor) and on EC2.

What does “fail” mean?  Means the client will open a socket to the server, write a bunch of stuff and close the socket – with no errors of any sort.  All standard blocking calls.  The server will get no information of any sort that a connection was attempted.  Let me repeat that: neither client nor server get ANY errors of any kind, the client gets told he opened/wrote/closed a connection, and the server gets no connection attempt, nor any data, nor any errors.  It’s exactly “as if” the client’s open/write/close was thrown in the bit-bucket.

We’d been having these rare failures under heavy load where it was looking like a dropped RPC call.  H2O has it’s own RPC mechanism, built over the RUDP layer (see all the task-tracking code in the H2ONode class).  Integrating the two layers gives a lot of savings in network traffic, most small-data remote calls (e.g. nearly all the control logic) require exactly 1 UDP packet to start the call, and 1 UDP packet with response.  For large-data calls (i.e., moving a 4Meg “chunk” of data between nodes) we use TCP – mostly for it’s flow-control & congestion-control.  Since TCP is also reliable, we bypassed the Reliability part of the RUDP.  If you look in the code, the AutoBuffer class lazily decides between UDP or TCP send styles, based on the amount of data to send.  The TCP stuff used to just open a socket, send the data & close.

So as I was saying, we’d have these rare failures under heavy load that looked like a dropped TCP connection (was hitting the same asserts as dropping a UDP packet, except we had dropped-UDP-packet recovery code in there and working forever).  Finally Kevin, our systems hacker, got a reliable setup (reliably failing?) – it was a H2O parse of a large CSV dataset into a 5-node cluster… then a 4-node cluster, then a 3-node cluster.  I kept adding asserts, and he kept shrinking the test setup, but still nothing seemed obvious – except that obviously during the parse we’d inhale a lot of data, ship it around our 3-node clusters with lots of TCP connections, and then *bang*, an assert would trip about missing some data.

Occam’s Razor dictated we look at the layers below the Java code – the JVM, the native, the OS layers – but these are typically very opaque.  The network packets, however, are easily visible with wireshark tools.  So we logged every packet.  It took another few days of hard work, but Kevin triumphantly presented me with a wireshark log bracketing the Java failure… and there it was in the log: a broken TCP connection.  We stared harder.

In all these failures the common theme is that the receiver is very heavily loaded, with many hundreds of short-lived TCP connections being opened/read/closed every second from many other machines.  The sender sends a ‘SYN’ packet, requesting a connection. The sender (optimistically) sends 1 data packet; optimistic because the receiver has yet to acknowledge the SYN packet.  The receiver, being much overloaded, is very slow.  Eventually the receiver returns a ‘SYN-ACK’ packet, acknowledging both the open and the data packet.  At this point the receiver’s JVM has not been told about the open connection; this work is all opening at the OS layer alone.  The sender, being done, sends a ‘FIN’ which it does NOT wait for acknowledgement (all data has already been acknowledged).  The receiver, being heavily overloaded, eventually times-out internally (probably waiting for the JVM to accept the open-call, and the JVM being overloaded is too slow to get around to it) – and sends a RST (reset) packet back…. wiping out the connection and the data.  The sender, however, has moved on – it already sent a FIN & closed the socket, so the RST is for a closed connection.  Net result: sender sent, but the receiver reset the connection without informing either the JVM process or the sender.

Kevin crawled the Linux kernel code, looking at places where connections get reset.  There are too many to tell which exact path we triggered, but it is *possible* (not confirmed) that Linux decided it was the subject of a DDOS attack and started closing open-but-not-accepted TCP connections.  There are knobs in Linux you can tweak here, and we did – and could make the problem go away, or be much harder to reproduce.

With the bug root-caused in the OS, we started looking our options for fixing the situation.  Asking our clients to either upgrade their kernels, or use kernel-level network tweaks was not in the cards.  We ended up implementing two fixes: (1) we moved the TCP connection parts into the existing Reliability layer built over UDP.  Basically, we have an application-level timeout and acknowledgement for TCP connections, and will retry TCP connections as needed.  With this in place, the H2O crash goes away (although if the code triggers, we log it and use app-level congestion delay logic).  And (2) we multiplex our TCP connections, so the rate of “open TCPs/sec” has dropped to 1 or 2 – and with this 2nd fix in place we never see the first issue.

At this point H2O’s RPC calls are rock-solid, even under extreme loads.


Found this decent article:

  • It’s a well known problem, in that many people trip over it, and get confused by it
  • The recommended solution is app-level protocol changes (send expected length with data, receiver sends back app-level ACK after reading all expected data). This is frequently not possible (i.e., legacy receiver).
  • Note that setting flags like SO_LINGER are not sufficient
  • There is a Linux-specific workaround (SIOCOUTQ)
  • The “Principle of Least Surprise” is violated: I, at least, am surprised when ‘write / close’ does not block on the ‘close’ until the kernel at the other end promises it can deliver the data to the app.  Probably the remote kernel would need to block the ‘close’ on this side until all the data has been moved into the user-space on that side – which might in turn be blocked by the receiver app’s slow read rate.



Captain’s Log Days 11-19

Captain’s Log Day 11

It’s another long drive day for us, we’re trying to get from Stone Mountain (near Atlanta) to Harrisburg, PA today – and Chaplain CT sometime tomorrow.  We’re quite expert and breaking camp by now; it takes maybe an hour to pull up all the sleeping bags and fold all the couches and tables back out, to shower and freshen up, to reload fresh water tanks and dump the other tanks.  We spend another hour in a local Walmart replacing basic supplies and then we’re on the road.

The kids have figured out how to keep themselves busy on the drive.  We’ve got a TV and a Wii, and some amount of reading.  There’s singing and tickle fights, and lots of napping.  There’s food-making and grumbling about dish cleanup.  We camp out in the middle of Pennsylvania.  We pass the 3500 miles traveled mark, the 1/2-way point.

Captain’s Log Day 12

We break camp at daylight without waking the kids, and drive maybe two hours before the kids bother to roll out of bed.  RV “camping” is a real trick.  We make it around New York with only 1 truly crazy driver incident; a bright red pickup truck came blazing up the left side and was clearly out of room to pass us, but did so anyways.  He sliced across at a 45-degree angle in front of us. Had I not slammed the brakes and swerved we clearly would have hit the truck; and such a hit would have rolled him.

We finally pull into my Uncle Bill’s farm in Connecticut around 4pm.  We settle the RV, then meander down to the river behind the farm, where one of my cousins is RV camping.  We swim in the river, cook burgers on the campfire and sit around visit until way past dark.

Captain’s Log Day 13

We hang out in the farm all day; some of the kids swim in the river or fish or shoot fireworks off after dark.  I mostly hung out and caught up with the family news.  Shelley & I attended the local church wine-tasting, which was basically a chance to drink a bunch of wines that somebody else bought, and do more catching up on family news.


Captain’s Log Day 14

Shelley & I borrow a cousin’s car and drive to Cape Cod for the day.  OMG’s a car is SO much nicer to handle than Nessie!  We take the slow route up the Cape stopping at every tiny town and inlet.  Shelley’s family owned a summer house in Dennis Port 50 or 60 years ago and Shelley was tracing her roots.  We managed to stick our toes in the Atlantic and really unwind.  Shelley & I both like driving, so it’s another really peaceful down day.


Captain’s Log Day 15

Up early, we force all the kids to take showers (and change clothes; 2 weeks into vacation and our standards are getting pretty lax) and we hit the road.  Breaking camp is now a pretty standard operation.  By rotating drivers and Shelley driving until the wee hours we make it almost to Indiana.


Captain’s Log Day 16

We pull into the University of Illinois at Urbana-Champaign around noon.  I’m giving at talk at 6, and UofI is paying for dinner and 3(!) hotel rooms for us (one for each couple, and one more for the 3 kids).  Real showers for all again!  Yeah!!!  The talk goes really well, its my Debugging Data Races talk and its a good fit for the summer course on multi-core programming.  Shelley and I manage to sneak a beer afterwards.


Captain’s Log Day 17

Again we manage to break camp in short order and do another long day of driving through Illinois, Iowa, and Nebraska.  By now we’ve got a rhythm going; Shelley takes the early morning driving shift while everybody sleeps in, then Luke and I alternate shifts until evening (while Shelley naps), and Shelley takes the late night shift.  I think we’re covering around 800 miles in a day.


Captain’s Log Day 18

Today it’s the rest of Nebraska and Wyoming, then Utah.  My Dad manages to call me out in the middle of I-80 no-where land, to the bemusement of all.  We hit high winds on and off all day.  At least once I was driving with the steering wheel cranked over a full 180 degrees (and was down to 45 mph) just to stay on the road.  18-wheeler’s would blow by us, knocking us all over the road.  First the bow wave push us hard to the right, on to the shoulder.  Then the wind-block (and my 180-degree wheel position) would drive us hard back onto the road and into the truck, then the trailing suction would pull us harder into the truck – even as I am cranking the wheel the other way as fast as I can… and then the wind would return.  It was a nerve-wracking drive.  Shelley took over towards evening.  Around 11pm the winds became just undrivable even for her.  I was dozing when suddenly we got slapped hard over, almost off the shoulder.  Even driving at 40mph wasn’t safe.  An exit appeared in the middle of nowhere – even with an RV park (mind you, it’s typically 30 miles between exits *without services*).  We bailed out.  All night long the RV was rocked by winds, like a Giant’s Hand was grabbing the top of Nessie and shaking her like a terrier does a rat.


Captain’s Log Day 19

Morning dawns clear and still.  We hit the road again early, as we’ve a long drive today.  It’s a quiet drive through to Reno, and then we hit some really crazy drivers again – a combo of construction zone, short merge lanes and stupidity (outside the RV) nearly crushed a darting micro-car.  The construction on the Donner Pass was perhaps even worse; we managed to get forced into clipping a roadside reflector on the right (less than a foot away from the mountain stone versus pushing an aggressive SUV into the naked concrete on his left).  Finally past all the madness we get to the clear road down from Tahoe and through the Bay Area – but it’s all Homeward Bound on the downhill slide through our home turf!

Home At Last!!!


Some parting stats:

We passed through 22 states (24 for Shelley & I, as we also get to count Rhode Island and Massachusetts).
We drove about 6900 miles.
I bought about $3000 in gas, and $1300 in tires.
We saw 4 close family members in Tucson, 7 in Texas, my brother in Atlanta, and at least 16 in Connecticut (I lost the exact count!).
I did about 20 loads of laundry after returning (the washer ran continuously for 2 days).



Captain’s Log Days 8, 9 & 10

Captain’s Log Day 8

We’re on the road by 10am, this time a full day’s drive to Montgomery AL from Katy TX.  I forget how big Houston freeways are; at one point I count 9 lanes *in each direction* (18 total lanes!).  I’ve never seen so much concrete.  It’s otherwise mostly uneventful, though.  Traffic is fair to light and the road is good.  We stop at a random lakeside park by Lake Charles for lunch.  It smells of the ocean and has an alligator pond/cage/viewing area.

While I typically encourage the kids to drink a lot (to survive the desert heat & dry), I don’t check on how much they eat, just that they eat a reasonably balanced diet.  So I missed out that Matt hadn’t eaten all day, and was constantly staring heads-down on his IPod on silly flash games.  Well, towards afternoon he starts feeling sick, and near dinner he barfs and refuses to eat or drink anything.  He cannot even keep down tiny bits of bread or Gatoraide; the 2nd barf happens on our bedsheets and pillow.  At this point he decides to camp out by the RV toilet and do any more barfing into that (uggh!!!  poor guy!!!!), and we decide to cut it short and look for camping for the night.  By dinner he’s still unable to keep anything down; we grab to-go food from a collection of fast-food joints and keep rolling to the nearest campsite.

We get 1/2 way between Mobile & Montgomery, AL and pull over into a nice full-service RV park.  Shelley & I decide to camp outside in a tent, so Josh can get off the floor (he’s 17 and 6ft tall, lean and flexible… and does not fit in any of the RV pullout/fold-down beds, so he’s been sleeping in the aisle).  We want Josh off the floor so Matt can make an emergency run from his foldout bed to the bathroom without interference.  It’s beastly hot and humid outside, but I figure it will cool off as the night wears on.  Boy was I wrong! It remains 80+ & 80% humidity all night long outside, while the kids were sleeping in air-conditioned luxury.  And we get a late night visit from the camp kitten – he’s adorably cute and caterwauls at us, and starts climbing the tent with his razor claws until Shelley takes him for a walk.  He follows her like a shadow all over the park until she finally has to lock him in the campground bathroom.


Captain’s Log Day 9

Finally dawn breaks and we move back into the cool RV air.  Ahhh, blessed relief.  Also, Matt is much better – it’s a common kid 24-hour tummy bug.  I start him back in on the BRAT diet, with sips of water – and now he’s very hungry, a good sign.  He continues to improve throughout the day and is eating normal by dinner.  We pull up camp (we’re getting quite expert at this) and head for Stone Mountain, GA.

Stone Mountain is a giant mountain-sized chunk of granite outside of Atlanta, with a park and a lake.  It’s been carved with a 50ft high sculpture and has been slowly improved over the years to include many hiking trails, a sky tram system, lots of outdoor adventure activities and an amusement park.  Apparently the “ducks” (amphibious vehicals) are fantastic.  We are going there for the July 4th extravaganza – and as a sign that I’m on vacation, I barely know that today is the 3rd and I’ve no idea what day of the week it is.  We get there about 3pm and check in to a nice RV camp site.

Shelley cooks a fantastic spaghetti dinner.  My brother Eric drives out to camp with us, bring his best friends’ two small girls (ages 6 & 7) with him (he’s been watching the girls when the parents are working since they were 2 & 3) and we all enjoy a nice picnic dinner.  As the evening rolls on we’re deciding on whether or not to see the laser & fireworks show this evening (there’s a bigger one tomorrow) – when the thunderstorm hits.  It’s a real downpour, big lightning and thunder, blowing wind, the works.  We wait that out, and then try to take a walk about the park.  Eric & I, the two girls and my middle two kids walk over to the clubhouse (to check out the water-taxi ride to the main park area) but the rain has other ideas.  We make it to the clubhouse but we’re fairly wet, so we treat the girls to hot chocolate while we dry out.  We wait for the rains to end but it’s no good – the rain has turned into a steady drizzle; we just as wet by the time we make it back and there’s no end in sight.  We give up any idea of tent camping or seeing the laser show and settle for watching a Disney movie (the Sword in the Stone) and having a lazy evening with all 10 of us huddled in the RV).  Sleeping arrangements are “cozy” to say the least!  But at least everybody is dry.


Captain’s Log Day 10

It’s the 4th of July!  We breakfast, cleanup & head over to the water taxi. The rains have stopped and the sun is out.  It’s gonna be a hot & humid day. The water taxi is nice, it’s cooler near the lake.  We make it to Stone Mountain’s main attraction area and decide to walk to the bell tower.  The park is already busier than Eric has ever seen it before.  There’s a large Indian family setup under the bells already (and I see more people of the same persuasion walking over to the tower all morning – I think they figured out a cool shady semi-private place to hang out at all day).

We’ve walked maybe a half a mile and it’s not even noon and we’re already soaked with sweat when we make it back to the Plantation Inn.  The Inn isn’t open for lunch (although the AC is nice), but the helpful counter lady tells us there’s RV parking closer in.  We walk up to Memorial Hall.  Immediately two things strike me as really odd: there’s at least 1000 people hanging around looking for food (and more pouring in all the time), it’s 11:30 and *none* of the dozen or so restaurants are open yet – and there’s bus & RV parking open
right in front of the main Hall.

I hand the kids my credit card (to get lunch at noon when the restaurants open) and Shelley and I hightail it back to the RV: across 1/2mile of hot trails & roads, ride the water taxi (we miss the one in front of us by literally seconds even with me sprinting across the landing area), and finally the 1/4 mile hike from the taxi dock to the RV.  We pull the hookups as fast as we can and roll out & down the road.  Nessie does NOT sprint, she *proceeds*, but we made her proceed as fast as possible.  We took the short way around the lake, only to discover the road was closed: the attendant at the barricades explains “the road fell in a hole”.  Nothing to be had for it; Shelley makes a 3-pt turn on a narrow park road and we go the long way around.  Finally, a full hour later, we make it back to the bus/RV parking in front of Memorial Hall – and Lo! it’s open.  We take the most premier parking spot in all of Stone Mountain, at noon-thirty on the 4th of July.  (A short time later one other RV takes the next spot, then the road is closed behind us).

The amazing thing about the Stone Mountain concessions was the astronomical price for food; hotdogs: $7-$10, drinks also $7 or so.  (And they denied a hot and hungry hoard for at least an hour???).  But finally we all sat down and finished our food and plotted our next move.  Shelley, Eric & I all want a big hike.  Last Christmas Shelley & I hiked the Grand Canyon down to Phantom Ranch and back out in two days, and Eric has hiked both the Pacific Crest Trail and the Appalachian Trail end-to-end.  We head out for the top of Stone Mountain on a hot & muggy day.  There’s lots of other folks with the same idea, but it really is a long hot hike.  Most of my kids bail out after a mile or so, voting to go hang out in the AC (which is really a good plan); Eric and his two young charges make it to the path-up cutoff but it’s a killer hike in the heat so they turn around also.

It ends up as Shelley, Laura (age 15) and I heading on, and we decide to head for the bird sanctuary.  It’s another couple of miles and we gave most of the water to Eric & the girls.  The three of us head down the far side of the mountain to a kids playground and finally drag ourselves into the park and help ourselves to the water fountain.  We drink a quart each, and fill a couple more quart bottles we’re carrying.  We hike the 1/2mile more to the bird sanctuary – mostly carrying on now because of what Shelley would call “Mission” – her ex-Marine training to “complete the Mission” no matter the cost.  i.e., we’re all too collectively silly to claim the end goal is ridiculous, so we hike it anyways. It’s a decent enough little woodsy trail, with plenty of songbirds – but far to beastly hot to really enjoy.  By the time we make it back to the kids’ park we’ve drunken all our water (another 1/2gal between the 3 of us), so we reload (and re-drink our fill) and back up the mountain to cross it in reverse.  We make it back in good time, although it was really pushing our the limits to
hike so far on such a hot day.

There is much lounging around and napping in the RV’s AC to wait out the heat of the day.  Matthew (age 12) introduces the two little girls to the joys of Minecraft.  Eric & Laura nap.  Everybody else surfs the (very very slow) park Internet, eating popcorn & chips.  Finally as the heat starts to fade and twilight sets in we get enough gumption to make & eat hotdogs.  Then we pack it up and prepare to leave the relative safety and peace of the RV for the slowly building hoard.

The lawn below Memorial Hall faces the giant sculpture carved into the face of Stone Mountain.  The only open spaces are at the very front, so that’s where we head.  I estimate 100,000 people eventually filled that lawn; in any case it was a colossal crowd.  It was also actually quite a peaceful crowd; no rowdies (no alcohol allowed), zillions of little kids running pell-mell, picnic blankets, soap bubble makers and glowing flashing LED lights.  It’s cooler now, so we settle down on our blankets and chairs, listen to the music and wait for the show.  At various times I let Josh or Karen & Luke wander off for snacks (a little nerve-wracking that; they are out of sight in the crowd within seconds and gone for 30mins or more, but everybody returns fine).

The fireworks show starts promptly at 9:30 and is possibly the best I’ve ever seen.  There’s a laser & light show on the mountain, there’s a Civil War tribute, (there’s ads for all of Georgia’s major sports teams), there’s music and of course fireworks.  The actual fireworks where downright amazing; you get a double-echo from the Bang! works, one directly and one bounced off the mountain.  They used plenty of the big fireworks and absolutely tons of rising sparks kind; the entire mountain was a sheet of fire for minutes at a time. The finale left us breathless.

Unwinding back to the camp was a slow but uneventful crawl; I’ve sure we beat the campers on foot (who had to wait for the river-taxis and the report was to expect a 2.5 hr wait).  Eric took his to charges home and we collapsed tired but triumphant for a full nights sleep.



Captain’s Log, Daze 5, 6 & 7

Captain’s Log, Day 5

It’s another early morning drive, this time we’re heading to San Antonio and then on to Luling TX for more relatives.  We’re still marching on through the great desert Southwest, but there are more signs of green now.  Some trees mixed in with the sage, and less cactus.

The ride to Luling is long but uneventful.  We give Luke another turn at the wheel.  The road is calm enough that we let Luke chug on for miles, and then we’re heading into San Antonio.  Suddenly the world is full of crazy drivers!  People are cutting in front of us, or darting around, or force-merging (on short merges) and giving us no space.  Luke brakes as he can, but we’re an 8 ton vehical!  We take at least twice as far to stop as a car!  We finally make it to a parking lot.

We have a great dinner in San Antonio with Grandpa & Grandma Weiner, and then we have to brave rush-hour traffic.  Shelley takes the helm this time, and a good thing too.  I’ve never seen such craziness.  We watched a pickup 4-wheeling it over the burm to cut traffic (and yes he set the dry grass on fire, we watched the smoke rise for a long time), we had endless numbers of people fight tooth-and-nail to get in front of us, only to switch lanes back a second later when some other lane had a slight advantage.  We have a little
sporty thing flash over from left to right, with us doing 60, with less than a foot spare across our bumper!  It was all over in an instant, and he missed us, but another foot and we woulda crunched him big.  It was a grueling two hours to get out of S.A.  Luling, where we spent the night at Grandpa’s house, was great.  We yakked all night while the kids worked out their cabin fever.  All in all, another fabulous Grandparent visit.


Captain’s Log, Day 6

Next morning, crack-o-noon, we headed out for my sister’s place in Katy (really far west Houston).  It’s another straight shot down I-10, and I-10 is in pretty good shape even out to Luling; as we approach Houston it widens to 6 lanes.  We start watching real weather appear; there’s a line of heavy thunderclouds forming up to the left and right of us and we’re heading right for them.  The wind starts to pick up and really buffet us; we slow down to 60 and then slower.  People are starting to park on the side of the road, but we want out of the impending storm.  Rain alternates between slashing and nothing.  The clouds get dark, low and ominous.  I start to see green clouds, and clouds moving the wrong direction.  I pull out Shelley’s “smart phone” and look up the local weather.  Sure enough, with vast modern technology, 4G wifi, low-power android-enabled cloud-backed internet weather smart-phone tech we discover what we already know: there’s two large thundercells on either side of I-10.  They happen alot during south Texas summers as warm wet Gulf air meets cooler midwest air.  And these storm cells often spawn tornadoes.  But after 20 mins of staring at awe-inspiring clouds and getting slammed by 40mph cross-winds we manage to roll through the middle of them and out the other side.  The rest of the trip in is entirelty uneventful, except for the trip down memory lane for me.

We get to my sister Ruth’s without incident and my kids rush in to play with her kids.  Then we have a comedy of errors trying to get power run to the RV.  First our old power cable gets hot and the RV power cuts off (which means the AC cuts off on a hot humid Houston summer day).  Then we think the outlet is bad, then we try to test the outlet with an old drill (drill not working), my laptop power supply (cannot see the little blue light in the sun), and finally a real tester (outlet is dead).  We switch outlets, then Aunt Ruth tells me the switch for that outlet is flakey, and it surely is; we quick-cycle the RV AC repeatedly without realizing it, and pop a 15amp house breaker.  We change outlets again, we change power cords again, we run the new cord through the garage to an internal 20amp circult, and finally it holds.  The RV stays well AC’d for the next 2 days.

Grandma’s over (*my* Mom this time) as she lives a few miles from my sister.  And we hang out and visit all day.  There’s wine & lasagna for dinner, and hot showers and full beds for all.


Captain’s Log Day 7

We all sleep in late.  We have pancakes & bacon for breakfast.  We run a few errands and then see the movie Brave (which is really good, BTW).  I end up connecting with an old college buddy and her boyfriend (Facebook!) so we invite them over for dinner.  Turns out the the boyfriend is also an old college friend, so suddenly it was Texas A&M U reunion night.  They are both divorced with one teenaged daughter each (compared to my 4), and enjoying life again after divorce.  We have a long evening of beer, hotdogs and college memories.  The kids Xbox continuously, or get their internet “fix” or play on the trampoline, or have drawing contests or otherwise monkey around.  It’s a really great “down time” lazy day.