Debugging Data Races

A weekly podcast with Cliff Click talking about all things to do with programming, programmers and computer performance.

This is a short talk on debugging one of the most confounding bugs in programming: the Data Race.

 

Java vs C/C++ – The Podcast

A weekly podcast with Cliff Click talking about all things to do with programming, programmers and computer performance.

A podcast of a blog I did here in the past, which turned into a great talk that I’ve given lots of times over a decade.

 

Winds of Change

I’ve left H2O.  I wish them all the best.  I’ve left a longer farewell here.

I’m a loose cannon, at least for a few months, and am looking for (the right kind of) trouble.

So I’m here, posting on my blog again, to see if I can get some suggestions on what to do with my life.  🙂

Here are a few of my crazy Cliff ideas I’m sorta chasing around:

  • Python Go Fast: Do Unto Python as Thou hast Done Unto Java. I hack the guts of Python; add a high power JIT, a full-fledged low-pause GC, true multi-threading support, i.e. make Python as fast and as parallel as Java (about the same speed as C).  This blog is really a request for an open discussion on this topic.  Is the Python community interested?  How does this get funded?  (uber Kickstarter?)  I’ll only go here with the full support of the core Python committers, and general “feel goods” from the general python community – and I’m hoping to start a discussion.  At this point I’m a premier language implementer, and making Python Go Fast is well within my abilities and past experiences. Take about 2 years & $2M for this effort to be self-sustaining (build all the core new tech and hand it off to other contributors).
  • H2O2: I left a lot of unfinished technical work at H2O – and H2O has plenty of technical growing room.  I could continue to contribute to the Open Source side of H2O, with some Big Company footing the dev bill.  Big Company gets kudos for supporting Open Source, H2O gets the next generation of cool new features.
    • Plan B, Big Company funds some new core closed-source innovations to H2O and monetizes that.  H2O still gets some Open Source improvements but not all core tech work is open.
  • Teach: I bow out of the Great Rat Race for a year and go teach Virtual Machines at Stanford or Berkley.  Fun, and makes for a nice sabbatical.  (as a bonus, I’ll probably have 3 kids in college next year, and the whole Stanford Pays Faculty Kids’ College thing sounds really good).  Might could do this while hacking Python guts at the same time.
  • Jetsons: I know how to Do the Flying Car Thing Right.  Million people, million flying cars in the air, all at once, and nobody can crash anything.  Feels like you’re flying but the Autopilot-o-Doom holds it all together.  Got figured out how to handle bad weather, ground infrastructure lossage (e.g. the Big Quake wipes out ground support in 10sec, how do you land 1million cars all at once?), integration into existing transportation fabric, your driveway as a runway, playing nice with the big jets, etc.  Bold statement, needs bold proof.  Lots more stuff here, been thinking on this one for a decade.  Needs 3 or 4 years to put together the eye-popping this-might-actually-work prototype.  Thus needs bigger funding; probably in the $10M range to get serious.
  • Something Random: by which I mean pretty much anything else that’ll pay the bills and is fun to do.  I got some pretty darned broad skillz and interests…

Gonna be a long-needed Summer-o-Crazy-Fun for me.  Heck, maybe I’ll post my 2nd tweet (shudder).  🙂

Cliff

 

How does Java Both Optimize Hot Loops and Allow Debugging

This blog came about because an old friend is trying to figure out how Java can do aggressive loop and inlining optimizations, while allowing the loading of new code and setting breakpoints… in that optimized code.

On 2/21/2015 11:04 AM, IG wrote:

Safepoint. I’m still confused because I don’t understand what is the required state at a safepoint that can’t be ensured at any random point of execution.  What is in-flight that must be permitted to complete? Given a cycle-by-cycle map you can find all the pointers at *any* cycle – is safepointing just a way to reduce the volume of pointer-locating info?

Reducing OOP-map volume happens, is useful – but is not the main thing being achieved.

It’s flipping backwards from the machine state to the language architected state, unwinding from super-aggressive optimization to reach the theoritical Java (or C/C++) language state at some points.  Toooo expensive to preserve that mapping line-by-line, call-by-call, execution-point-by-execution-point. So you keep the mapping at a few places, and optimize the bejeebers out of things in-between.  If somebody wants to:

  • set a breakpoint in optimized code (yes, works for Java all the time)
  • inject new classes, new targets for some virtual-call

then you need to stop the physical machine (actual CPU/Mill) at some place where you can find all the language architected pieces.  This mapping is far more complex than the oop/no-oop mapping for GC… but isn’t why you make it rare.  You make it rare because you have to hang onto all the language state at these safepoints, but don’t need it between safepoints.

An example might help:

  for( int i=0; i<N; i++ )
    sum += A[i]  // set a breakpoint here, for "i>1e6" or maybe "i>1e7"

So intent is to run awhile… then somebody sets a breakpoint… and wants to inspect “sum”, or change “i” in the debugger, then continue execution.  What’s the machine code look like, with no safepoints?  Something like this (bogus X86-like assembly code, pardon non-perfection):

 ... some setup, zero-trip test, range-check on "N" vs "A.length", etc
 ld  RA=&A        // Classic strength-reduced pointer into array A
 add RC,RA,#N*8   // Stopping value of A
 ld  RB=0         // sum
 loop:
 ld  RD=[RA+off]  // Load from A
 add RB=RB,RD     // Sum into RB
 add RA=RA,8      // Advance our pointer into A
 cmp RA,RC        // Stop when at the end
 blt loop
 // RB contains sum

 

But actually, with some loop unrolling:

... some setup, zero-trip test, range-check on "N" vs "A.length", etc
 ld  RA=&A        // Classic strength-reduced pointer into array A
 add RC,RA,#N*8 & (-1-3)  // mask of low 2 bits of iteration - unroll by 4
 ld  RB=0 // sum
 loop:
 prefetch [R1+off+32]  // Fetch next cache line... overlap memory & compute
 ld  R0=[RA+off+ 0]    // Get 4 values array A
 ld  R1=[RA+off+ 8]
 ld  R2=[RA+off+16]
 ld  R3=[RA+off+24]
 add R4=R0,R1  // add-tree to allow more parallel adders
 add R5=R2,R3  // add-tree to allow more parallel adders
 add RB=RB,R4  // partial sum into RB
 add RB=RB,R5  // full sum into RB
 add RA=RA,8*4 // Skip over 4 elements of array A
 cmp RA,RC     // Stop when at the end
 blt loop
 ... cleanup code for 0-3 more iterations
 // RB contains sum

 

Now I want to break on the “sum += A[i]” line… where is that, exactly, in that unrolled loop?

And which register contains “i”?  (Hint: none of them).

Do I even want to keep a shadow copy of “i” in parallel with my “A” pointer I’m rolling through the loop?  It’s pure overhead, unless I breakpoint and want to have some place to read “i”…. but even then I cannot *change* this shadow “i” because it won’t change my “A” pointer.

What about changing N?  Do I have enough info in the register mapping to describe all possible kinds of updates I might want to do to register RC when I tweak N?  This example is simple, but optimizing compilers can do hella complex code changes.

How does a Safepoint help here?  Well, for starters it points out that I do indeed need some way to find “i”, and if I change it then propagate those changes back into the loop.  Lets look at this code with a Safepoint in it.

... some setup, zero-trip test, range-check on "N" vs "A.length", etc
 ld  RA=&A        // Classic strength-reduced pointer into array A
 add RC,RA,#N*8 & (-1-3)  // mask of low 2 bits of iteration - unroll by 4
 ld  RB=0         // sum
 ld  RI=0         // Concession to Safepoint: keep "i" around
 loop:
 prefetch [R1+off+32]  // Fetch next cache line... overlap memory & compute
 ld  R0=[RA+off+ 0]    // Get 4 values array A
 ld  R1=[RA+off+ 8]
 ld  R2=[RA+off+16]
 ld  R3=[RA+off+24]
 add R4=R0,R1  // add-tree to allow more parallel adders
 add R5=R2,R3  // add-tree to allow more parallel adders
 add RB=RB,R4  // partial sum into RB
 add RB=RB,R5  // full sum into RB
 add RA=RA,8*4 // Skip over 4 elements of array A
 add RI=RI,4   // Consession to Safepoint: keep "i" around
 SAFEPOINT: RI contains "i", RB contains "sum", "some place in the stack" contains "N"
 cmp RA,RC     // Stop when at the end
 blt loop
 ... cleanup code for 0-3 more iterations
 // RB contains sum

 

So I added 1 op in my hot loop to compute “i” on the fly.  Not too expensive; 1 clock per cache-line of data.  In general we end up keeping alive what is otherwise programmatically dead “just in case” the user stops the running code and wants to inspect (dead) variables – but this is cheap.  Just spill ’em off to stack somewhere and flag ’em in the Safepoints variable map.

How do we trigger the Safepoint?  Easiest way is to have the safepoint code test some thread-local bit for a “stop” bit, and if found… stop.  If we have to stop we might have a lot of cleanup to do, but stopping is rare so the cost (of mostly not stopping) is low.  There’s lots of ways to make the safepoint check cheap.  Here’s one that depends on keeping a 2Meg (TLB-sized) window above the stack pointer mapped in or out – perfect for an X86 TLB:

  ld RX,[RSP+2M] // will trap if we map this page out, then catch the SEGV & handle

 

How do we use the Safepoint?  Once we’ve stopped the running thread mid-loop, we expect the trap handler to have saved off all the registers.  Later we can inspect the register map to find “i” or “sum”.  Note that a Java-level query cannot ask for our “RA” register – as it’s the sum of “&RA” and “i*8” and isn’t visible at the language level – so “RA” does not appear in the Safepoint’s map.

How do we change “i” or “N” and have it All Just Work?  Basically, we re-enter a clone of the loop from above…. a clone specially made to be re-entered at any iteration.  The loop setup code in the clone will rebuild “RA” and “RB” and any other intermediate state as needed, before falling into essentially the same code as the normal loop (and indeed they might be shared here!).  Or in other words – since the optimizer did something complex to build “RA” from “&A” and “i” – have the optimized do it again for any arbitrary new “i”.

The general plan is simple (from the optimizing compiler’s point-of-view): Safepoints are treated like a function-call only taken on a very very rare execution path.  Safepoints take as inputs all visible language-level state, including the state of all inlined function calls, and generally are assumed to update all memory state but not any local vars (if you are also hacking local variable state via the debugger interface, then we need to go a step further and deopt/reopt – leading to the loop clone mentioned above).

With this magical “low frequency function call” in hand, the optimizer attempts the best optimizations it can, subject to the normal constraints that the call arguments are available somehow.

Hope this helps,
Cliff

 

Hacking in H2O

In preparation for H2OWorld – I written a couple of blogs to help people quick-start in hacking in H2O.  All of these talks blend engineering, math and Big Data – and need nothing more than a really basic (free) Java development environment.

http://blog.h2o.ai/2014/11/hacking-algorithms-into-h2o-kmeans/

http://blog.h2o.ai/2014/11/hacking-algorithms-into-h2o-quantiles/

http://0xdata.com/blog/2014/11/Hacking/Algo/Grep

 

Cliff

Sparkling Water: H2O + Scala + Spark

Spark is an up and coming new big data technology; it’s a whole lot faster and easier than existing Hadoop-based solutions. H2O does state-of-the-art Machine Learning algorithms over Big Data – and does them Fast. We are happy to announce that H2O now has a basic integration with Spark – Sparkling Water!

This is a “deep” integration – H2O can convert Spark RDDs to H2O DataFrames and vice-versa directly. The conversion is fully in-memory and in-process, and distributed and parallel as well. This also means H2O has a (nearly) full integration with Scala as well – H2O DataFrames are full Scala Collection objects – and the basic for each collection calls naturally run distributed and parallel also.

A few months back we announced the start of this initiative: Sparkling Water = H2O + Spark

In that post we went for the quickest integration solution possible: separate processes for Spark and H2O, with a Tachyon bridge. While this worked, it required fully 3 copies of all data: a Spark copy, a Tachyon bridge copy, and an H2O copy – and the data passed through process boundaries repeatedly. With this work we need only the Spark and H2O copies, the data is only copied once,and does not cross a process boundary. It’s both faster and uses less memory.

Scala integration is being showcased before Spark integration simply because we need to understand H2O’s Scala integration before we can understand H2O’s Spark integration.


H2O’s Scala API

As always, the code is open source and available on github. This code is all in 0xdata’s “h2o-dev” repro – a clean-slate dev friendly version of H2O. As of this writing, h2o-dev has all the core h2o logic, and about 50% of the machine learning algos (and the rest should appear in a month).

https://github.com/0xdata/h2o-dev/

The basic Scala wrappers are here:

https://github.com/0xdata/h2o-dev/tree/master/h2o-scala/src/main/scala/water/fvec/DataFrame.scala

https://github.com/0xdata/h2o-dev/tree/master/h2o-scala/src/main/scala/water/fvec/MapReduce.scala

And some simple examples here:

https://github.com/0xdata/h2o-dev/blob/master/h2o-scala/src/test/scala/water/BasicTest.scala

The idea is really simple: a Scala DataFrame extends an H2O water.fvec.Frame object.

class DataFrame private ( key : Key, names : Array[String], vecs : Array[Vec] ) extends Frame(key,names,vecs) with Map[Long,Array[Option[Any]]] {

The Scala type is a Map[Long, Array[Option[Any]]] – a map from Long row numbers to rows of data – i.e. a single data observation. The observation is presented as an Array of Option[Any] – array elements are features in the observation. H2O fully supports the notion of missing elements or data – and this is presented as a Scala Option type. Once you know a value is not“missing” – what exactly is it’s type? H2O Vecs (columns) are typed to hold one of these types:- a Number (Java double conceptually, but any of boolean, byte, char, int, float, long, double)– a String– a Factor/Enum (similar to interned Strings that are mapped to small dense integer values)– a Timestamp (stored internally as milliseconds since the Unix Epoch)– a UUID

A DataFrame can be made from a Spark RDD or an H2O Frame directly or from a CSV file:

val df1 = new DataFrame(new File("some.csv"))

Column subsets (a smaller dataframe) can be selected via the column name:

val df2 : DataFrame = df2('a_column_header,'age,'id)

You can do a basic foreach:

df2.foreach( case(x,age,id) => ...use x and age and id... )

And this will run distributed and parallel across your cluster!

The code for foreach is very simple:

override def foreach[U](f: ((Long, T)) => U): Unit = {
  new MRTask {
    override def map( chks : Array[Chunk] ) = {
      val start = chks(0)._start
      val row = new T(chks.length)
      val len = chks(0).len
      (0 until len).foreach{ i =>
        (0 until chks.length).foreach{ col => row(col) = if( chks(col).isNA0(i) ) None else Some(chks(col).at0(i)) }
        f(start+i,row)
      }
    }
  }.doAll(this)
}

More complicated examples showing e.g. a scala map/reduce paradigm will follow later.


WaterWorks – Flowing Data between Spark and H2O

Let’s turn to Spark now, and show H2O and Spark working together. Spark RDD’s and H2O Frames are similar – but not equal – ways to manage large distributed data. Basically RDDs are a collection of blocks (Partitions) of rows, and Frames are a collection of columns, aligned in rows and broken up in Chunks.Sort of a blocks-vs-stripes thing.

H2O represents data in columns (Vecs in H2O lingo) vertically striped across the cluster; a Frame is a collection of columns/Vecs. Vecs, in turn,are broken up into Chunks – and the Chunks are carefully aligned in the JVM heaps such that whole rows of data (all the columns in a Frame) align.

Spark represents data in RDDs (Resilent Distributed Dataset) -a Collection of elements. Each RDD in broken up in turn into Partitions blocked across the cluster; each Partition holds a subset of the Collection as complete rows (or observations in data-science parlance).

H2O Frames, Vecs and Chunks can represent any primitive Numeric type, Strings, timestamps, and UUIDs – and support the notion of a missing value. RDDs ca nrepresent a collection of any Scala or Java object. If we limit the objects to containing fields that H2O represents then we can build a simple mapping between the two formats.

For our example we’ll look at a collection of Users:

class User(
  id: Long,
  age: Option[Short],
  sex: Option[Boolean],
  salary: Int
)

In Scala, this is typed as RDD[User], and we’ll have 1 User object per real User, blocked in Partitions around the cluster. In H2O, DataFrames naturally hold all numeric types and missing values in a column format; we’ll have 4columns each holding a value for each user again distributed in Chunks around the cluster. Here’s a single Partition hold 4 users, and showing 4 Chunks (one per column):

part_chunk

Here’s 4 JVM heaps holding all 32 users. There are 8 partitions (two per JVM)and 32 Chunks in 4 Vecs striped around the cluster:

rdd_frame


An Example: Back and Forth between H2O and Spark

Here’s a simple Scala example; we’ll assume we have the 4-node Sparkling Water cluster above and a CSV (or Hive) file on disk. We’ll use a simple User data file featuring just a unique id, the user’s age, sex and salary. From just the age and sex we’ll try to predict the typical salary using DeepLearning.

First we load the CSV file into H2O. H2O features a fast robust CSV parser which is able to ingest CSV data at full disk bandwidths across a cluster. We can handle a wide variety of CSV formats (e.g. Hive is a CSV with ^A/0x01 field separators) and typically automatically infer all column types. Unlike Spark,operations in H2O are eager not lazy, so the file is loaded and parsed immediately:

// Load an H2O DataFrame from CSV file
val frameFromCSV = new DataFrame(new File("h2o-examples/smalldata/small_users.csv"))

Then we’ll convert to a Spark RDD – like all RDD operations, this conversion islazy; the conversion is defined but not executed yet:

val sc = createSparkContext()
val table : RDD[User] = H2OContext.toRDD(cs,frameFromCSV)

And then run an SQL query! SQL is a great way to subselect and munge data, and is one of Spark’s many strengths.

table.registerTempTable("user_table") // The RDD is now an SQL table
val query = "SELECT * FROM user_table WHERE AGE>=18" // Ignore underage users
val result = sql(query) // Run SQL query

Then we’ll convert back to an H2O DataFrame – and run H2O’s Deep Learning(NeuralNets) on the result. Since H2O operations are eager, this conversion triggers the actual SQL query to run just before converting.

// Convert back to H2O DataFrame
val frameFromQuery = H2OContext.toDataFrame(sc,result)

Now it’s time for some Deep Learning. We don’t want to train a Deep Learning model using the ID field – since it is unique per user, its possible that the model learns to spot interesting user features from the ID alone – which obviously does not work on the next new user that comes along! This problem is calledoverfittingand H2O’s algorithms all have robust features to avoid it – but in this case we’ll just cut out the id column by selecting a subset of features to train on by name:

val trainFrame = frameFromQuery('age,'sex,'salary)

Deep Learning has about 60 parameters, but here we’ll limit ourselves to setting just a few and take all the defaults. Also note that sometimes the age or sex information is missing; this is a common problem in Data Science – you can see it in the example data. DataFrames naturally support this notion of missing data. However each of the Machine Learning algorithms in H2O handle missing data differently – the exact behavior depends on the deep mathematics in the algorithms. See this video for an excellent discussion on H2O’s DeepLearning.

// Deep Learning!
// – configure parameters
val dlParams = new DeepLearningParameters()
dlParams.source = trainFrame 
dlParams.response = trainFrame(‘salary)
// Run Deep Learning. The train() call starts the learning process
// and returns a Future; the training runs in the background. Calling
// get() blocks until the training completes.
val dlModel = new DeepLearning(dlParams).train.get

At this point we have a Deep Learning model that we can use to make predictions about new users – who have not shared their salary information with us. Those predictions are another H2O DataFrame and can be moved back and forth between H2O and Spark as any other DataFrame. Spark and Scala logic can be used to drive actions from the predictions, or just used in further in a deep RDD and H2O pipeline.


Summary

In just a few lines of Scala code we now blend H2O’s Machine Learning and Spark’s data-munging and SQL to form a complete ML solution:

  • Ingest data, via CSV, existing Spark RDD or existing H2O Frame
  • Manipulate the data, via Spark RDDs or as a Scala Collection
  • Model in H2O with cutting-edge algorithms
  • Use the Model to make Predictions
  • Use the Predictions in Spark RDDs and Scala to solve our problems

In the months ahead we will be rounding out this integration story – turning brand-new functionality into a production-ready solution.

I hope you like H2O’s new Spark and Scala integration – and always we are very excited to hear your feedback – and especially code to help further this good cause!  If you would like to contribute – please download the code from Git and submit your pull requests – or post suggestions and questions on h2ostream.

Cliff

A K/V Store For In-Memory Analytics, Part 2

This is a continuation of a prior blog on the H2O K/V Store, Part 1.

A quick review on key bits going into this next blog:

  • H2O supports a very high performance in-memory Distributed K/V store
  • The store honors the full Java Memory Model with exact consistency by default
  • Keys can be cached locally for both reads & writes
  • A typical cached Key Get or Put takes 150ns
  • A cache-missing operation requires network traffic; 1 or 2 UDP packets, or a TCP connection for large payloads.
  • Single-key transactional updates are supported
  • The Keys are pseudo-randomly distributed, removing a “hot-blocks” problem
  • There is no “master” Node; the cluster is fully peer-to-peer
  • Individual Keys have a “Home Node”, responsible for ordering conflicting writes to the same Key; the Home Node varies from Key to Key
  • Network operations are built on top of a reliable RPC mechanism built into H2O

The code for all of this can be found in the H2O GIT repo:
https://github.com/0xdata/h2o/blob/master/src/main/java/water

The Distributed K/V Store (hereafter, the DKV store), is similar to the hardware cache-coherency protocols MESI, but altered to better match the software layer.  In this hardware analogy, a Key is akin to an address in memory, a Value’s payload is akin to the data stored at that address – and the Value itself is more like the hardware cache coherency bits.  These bits are not directly visible to the X86 programmer, but they have a profound impact on the (apparent) speed of memory, i.e. the cache behavior.  As is typical for this kind of thing, the actual implementation is basically a distributed Finite State Machine.

Keys have a “Home Node” (pseudo-randomly assigned – the Home Node is different for each Key).  Keys keep a little bit of state cached (mostly Home Node and a proper hash) but do not directly track coherency – that honor is given to the Values.

On the Home Node then, a Value tracks which other Nodes have replicas of this Value, and is responsible for invalidating those replicas when the Key is updated to a new Value.  Values also are used to force ordering of writes to the same Key from the same non-home writer – 1 JVM can only have a single outstanding write to the same Key in flight at once (although there’s no limit to the number of pending writes to unrelated Keys).  Hence parallel writes to unrelated Keys all work in parallel – the limit being the network bandwidth instead of the network latency.

Each Value holds a single AtomicInteger for a read/writer-lock, and a NonBlockingSetInt (really just a fast concurrent bitset), and a pointer to the user’s payload.  The R/W lock holds the count of pending Gets in-flight, or -1 if the Value is write-locked.  The pending-Gets count is typically zero, only going up when a new cached replica is being set in some Node; once the remote Node has the replica (completes it’s Get) the pending-Gets count will fall again.  The bitset holds the set of Nodes replicating this Value.  Gets are implemented in TaskGetKey.java, and are an instance of H2O’s reliable RPC mechanism in RPC.java.

On a non-home node, the Value’s R/W lock field only holds a flag indicating that the Value is being overridden by a fresh local write, or not.

For a running example, lets assume a 4-node cluster.  The cluster members are called A,B,C, and D.  We have a single Key, “K1” and start with a single Value “V1”.  When writing a Value, we’ll follow with the count in the R/W lock, and a list of Nodes in the replica list.  Example: “V1[2,A]” means Value V1 has 2 pending-Gets, and is replicated on Node A.

A quiescent Value, with no Gets-in-flight and no replicas:

p1_0011

Actually, this is a 4-node cluster with K1 & V1 on Node A and not on the other nodes.  Hence a better picture is:

p2_0011

Node B has K1 and does a Get.  This misses in B‘s local K/V store (see DKV.java), so B does a remote Get (TaskGetKey.java).  K1’s hash (available directly from K1 without talking to any other cluster member) tells us that K1’s home is Node A.  Hence B fires off a TaskGetKey from B to A requesting a replica of K1.  Here’s the flow:

p3_0011

Note that once Node B receives V1, the thread in B waiting for it can immediately begin processing V1.  The ACKACK is sent back asynchronously.  The request for K1 fits in a UDP packet.  Assuming reply V1 does as well, B only has to wait for a round-trip UDP packet send & receive before getting his data.  Assuming Nodes C & D make the same request:

p4_001

At this point any Node in the cluster can retrieve V1 from K1 at no more cost than a hashtable lookup.

What happens when C wants to write a new Value V2 to Key K1?  C writes V2 locally, then sends V2 to AA then resolves any conflicting writes (none here), and invalidates the copies B & D have.  Future reads by B & D will then reload the updated value from A.  Finally, C gets notification that his write has completed.  Here’s the flow:

p5_0011

Let’s work through this diagram a little.

  • Node C writes V2 locally.  This means parallel threads within C can see the update immediately.  Also the writing thread can choose to do either a volatile write or not.  For a volatile write, the writing thread will block until it receives an ACK from Node A that the invalidates are complete.  For non-volatile writes, this is a fire-and-forget operation, although a future unrelated volatile write might need to block until C receives the ACK from A.
  • Node C writes V2 with an “X” in the R/W-lock – a flag indicating that the write of V2 is in-progress.  Node C is not allowed to start ANOTHER write to K1 until this write completes; further writes (of K1 from C) will block.  Hence Node A does not have to handle UDP out-of-order writes from C.  Writes to other Keys are not blocked.
  • Unlike a remote Get, the payload goes out in the initial send from C to A; the payload size determines if this is via UDP or TCP.  The ACKs are all small, and always go via UDP.
  • Value V1 in Node A is tracking the replicas, very akin to what a hardware cache mechanism would do.  Upon receiving the write from C several things happen in short order (but distinctly spaced in time): (1) A single atomic replace operation in A‘s local K/V store dictates which of several possible racing writes “wins” (at this point in time racing unrelated readers in A can see either V1 or V2; both are valid responses since the readers are racing).  (2) V1 is locked against future reads (at this point in time, any racing readers finding V1 will fail and upon retrying find V2); locking is done by setting the R/W-lock to -1.  (3) V2’s replicas are set to the sender C (who must have had a copy to send it!) and the receiver A, and these Nodes are removed from V1’s replica list.
  • Now A can start invalidate replicas around the cluster.  Note that if there are none, this operation takes no time!  Otherwise A sends out invalidate commands (to B & D here) and waits for the response…. thus causing C to wait until the cluster-wide invalidates are done.
  • As soon as the last invalidate is ACK’d back to A, A will ACK back to C.
  • The ACKACKs here can happen in any order and are not required for proper coherency & ordering; ACKACKs are used by the reliable RPC mechanism to know that the ACK made it.

Let’s talk a little about the performance here.  What operations are fast, or slow – and why?

  • All Things Cache Locally.  In the absence of conflicts, everything caches locally and no network traffic is required to make forward progress.  Cached Readers take about 150ns a pop.  Cached writes can all go fire-and-forget style (bound by network bandwidth and not latency), and a Reader following a Writer goes fast, and “sees” the write.
  • Readers Never Block.  If you look in the above flow, the only time a Reader blocks is on a cache-miss: the reader simply has to wait for the network to deliver the bytes.  Note that a Reader who misses locally and is blocked on the Home Node delivering a result, does NOT in turn wait for the Home Node to finish any other network operation.  In particular, the Home Node can deliver a valid result even while Invalidates are in-flight.
  • Unrelated Writes do not wait on each other.  Parallel unrelated writes go as fast as the network allows (bandwidth, not latency).
  • Writes can (rarely) Block on Readers.  Taking the write-lock requires all pending-Gets to complete.  Max of 1 pending-Get per Node, and they all complete in parallel – so this cannot take longer than the time to send the old Value around the cluster.
  • Back-to-back Writes to the same key Wait.  Each Node can only have 1 pending write to the same Key at a time.  Back-to-back updates will block until the first one fully completes.
  • Distributed writes to the same key are ordered, by the Home Node’s atomic replace operation.  However, they will typically be handled very fast after the first one – once the invalidates are done, a series of Puts will be ordered by the replace operation as fast as the memory system allows, and the ACKACKs can complete in any order (again, network bandwidth not latency).

These common use cases do not block, and go at either network speeds or memory speeds:

  • Parallel Reading Local Big Data.  Cached locally as-needed, reads at memory speeds
  • Parallel Reading Remote Big Data.  Uses full network bandwidth, and then cached thereafter
  • Parallel Writing Local Big Data.  Streaming writes use full memory bandwidth, no network
  • Parallel Writing Remote Big Data.  Streaming writes use full network bandwidth, not any round-trip latency

We have yet to talk about:

  • Resolving racing writes
  • resolving racing readers & writers
  • Slow network reordering events (reliable RPC turns unreliable network into slow network w/reordering)

But… I think I’m out of space!

Comments welcome as always,

Cliff

H2O Architecture

H2O Architecture

This is a top-level overview of the H2O architecture.  H2O does in-memory analytics on clusters with distributed parallelized state-of-the-art Machine Learning algorithms.  However, the platform is very generic, and very very fast.  We’re building Machine Learning tools with it, because we think they’re cool and interesting, but the platform can do much more.

H2O is based on a number of layers, and is coded to at different layers to best approach different tasks and objectives.  We talk about the MapReduce execution flavor alot in public, because it’s easy to explain, because it covers a lot of ground, and because we’re implemented a bunch of dense linear-algebra style algorithms with it – but that’s not the only thing we can do with H2O, nor is it the only coding “style”.  Here’s a rundown of the layers in H2O:

In-memory distributed K/V store layer: H2O sports an in-memory (not-persistent) K/V store, with *exact* (not lazy) consistency semantics and transactions.  The memory model is exactly the Java Memory Model, but distributed.  Both reads and writes are fully cacheable, and typical cache-hit latencies are around 150ns (that’s 150 nanoseconds) from a NonBlockingHashMap.  Let me repeat that: reads and writes go through a non-blocking hash table – we do NOT suffer from a classic hot-blocks problem.  Cache-misses obviously require a network hop, and the execution times are totally driven by the size of data moved divided by available bandwidth… and of course the results are cached.  The K/V store is currently used hold control state, all results, and the Big Data itself.  You can certainly build a dandy graph-based algorithm directly over the K/V store, or integrate H2O’s model scoring into a low-latency production pipeline.   

      * A note on cluster size:  H2O clusters are peer-to-peer, and have no upper bound on the size.  We have built clusters with over a 100 nodes and multiple Tb’s of DRAM.  Every node “knows” where every Key is, without actually having to hold onto the Key.  Hence asking for a particular Key’s Value is no more expensive than 1 network hop from the Key requester to some Key holder, and back.

     * A note on compression Big Data is heavily (and losslessly) compressed – typically 2x to 4x better than GZIP on disk (YMMV), and can be accessed like a Java Array (a Giant greater-than-4billion-element distributed Java array).  H2O guarantees that if the data is accessed linearly then the access time will match what you can get out of C or Fortran – i.e., be memory bandwidth bound, not CPU bound.  You can access the array (for both reads and writes) in any order, of course, but you get strong speed guarantees for accessing in-order.  You can do pretty much anything to an H2O array that you can do with a Java array, although due to size/scale you’ll probably want to access the array in a blatantly parallel style.

      * A note on decompression: The data is decompressed Just-In-Time strictly in CPU registers in the hot inner loops – and THIS IS FASTER than decompressing beforehand because most algorithms are memory bandwidth bound.  Moving a 32byte cached line of compressed data into CPU registers gets more data per-cache-miss than moving 4 8-byte doubles.  Decompression typically takes 2-4 instructions of shift/scale/add per element, and is well covered by the cache-miss costs.  As an example, adding a billion compressed doubles (e.g., to compute the average) takes 12ms on a 32-core (dual-socket) E5-2670 Intel – an achieved bandwidth of over 600Gb/sec – hugely faster than all other Big Data solutions.  Accounting for compression, H2O achieved 83Gb/sec out of the theoretical maximum of 100Gb/sec on this hardware.

      * A note on Big Data and GC: H2O keeps all our data in heap, but in large arrays of Java primitives.  Our experience shows that we run without GC issues, even on very large heaps with the default collector.  We routinely test with e.g. heaps from 2G to 200G – and never see FullGC costs exceed a few seconds every now and then (depends on the rate of Big Data writing going on).  The normal Java object allocation used to drive the system internally has a negligible GC load.  We keep our data in-heap because its as fast as possible (memory bandwidth limited), and easy to code (pure Java), and has no interesting GC costs.  Our GC tuning policy is: “only use the -Xmx flag, set to the largest you can allow given the machine resources”.  Take all the other GC defaults, they will work fine.

      * A note on Bigger Data and GCWe do a user-mode swap-to-disk when the Java heap gets too full, i.e., you’re using more Big Data than physical DRAM.  We won’t die with a GC death-spiral, but we will degrade to out-of-core speeds.  We’ll go as fast as the disk will allow.  I’ve personally tested loading a 12Gb dataset into a 2Gb (32bit) JVM; it took about 5 minutes to load the data, and another 5 minutes to run a Logistic Regression.

      * A note on data ingestWe read data fully parallelized from S3, HDFS, NFS, URI’s, browser uploads, etc.  We can typically drive HDFS disk spindles to an interesting fraction of what you can get from e.g. HDFS file-copy.  We parse & compress (in parallel) a very generous notion of a CSV file (for instance, Hive files are directly ingestable), and SVM light files.  We are planning on an RDD ingester – interactivity with other frameworks is in everybody’s interest.

      * A note on sparse data: H2O sports about 15 different compression schemes under the hood, including ones designed to compress sparse data.  We happily import SVMLight without ever having the data “blow up” and still fully supporting the array-access API, including speed guarantees.

      * A note on missing data: Most data sets have missing elements, and most math algorithms deal with missing data specially.  H2O fully supports a notion of “NA” for all data, including setting, testing, selecting in (or out), etc, and this notion is woven through the data presentation layer.

    * A note on streaming data:H2O vectors can have data inserted & removed (anywhere, in any order) continuously.  In particular, it’s easy to add new data at the end and remove it from the start – i.e., a build a large rolling dataset holding all the elements that fit given a memory budget and a data flow-rate.  This has been on our roadmap for awhile, and needs only a little more work to be fully functional.

 Light-weight Map/Reduce layer: Map/Reduce is a nice way to write blatantly parallel code (although not the only way), and we support a particularly fast and efficient flavor.  A Map maps Type A to Type B, and a Reduce combines two Type B’s into one Type B.  Both Types A & B can be a combination of small-data (described as a Plain Old Java Object, a POJO) and big-data (described as another Giant H2O distributed array).  Here’s an example map from a Type A (a pair of columns), to a Type B (a POJO of Class MyMR holding various sums):
   new MyMR<MRTask> extends MRTask {
     double sum0, sum1, sq_sum0;           // Most things are allowed here
     @Override public void map( double d0, double d1 ) { 
       sum0 += d0;  sum1 += d1;  sq_sum0 += d0*d0;  // Again most any Java code here
     }
     @Override public void reduce( MyMR my ) {   // Combine two MyMRs together
       sum0 += my.sum0; sum1 += my.sum1; sq_sum0 += my.sq_sum0;
     }
   }.doAll( Vec v0, Vec v1 );  // Invoke in-parallel distributed

This code will be distributed ’round the cluster, and run at memory-bandwidth speeds (on compressed data!) with no further ado.  There’s a lot of mileage possible here that I’m only touching lightly on.  Filtering, subseting, writing results into temp arrays that are used on later next passes; uniques on billions of rows, ddply-style group-by operations all work in this Map/Reduce framework – and all work by writing plain old Java.

     * A note Group-By: We have a GroupBy operator running at scale (called ddply in the R community), built using this Map/Reduce framework.  The GroupBy can handle millions of groups on billions of rows, and runs Map/Reduce tasks on the group members (so a few large groups runs as fast as many small groups).

     * Scala, and a note on API cleanliness: We fully acknowledge Java’s weaknesses here – this is the Java6 flavor coding style; Java7 style is nicer – but still not as nice as some other languages.  We fully embrace & support alternative syntax(s) over our engine.  In particular, we have an engineer working on a in-process Scala (amongst other) interfaces.  We are shifting our focus now, from the excellent backend to the API interface side of things.  This is a work-in-progress for us, and we are looking forward to much improvement over the next year.

     * A note on R: H2O supports an R-like language – not full R semantics – but the obviously data-parallel data-munging aspects of R, and of course all the operators run fully parallel and distributed.  There is a REPL.  You can use it to add or drop columns or rows, manufacture features, impute missing values, or drop-in many R-expressions and have them run at-scale.

 Pre-Baked Algorithms LayerWe have the following algorithms pre-baked, fully optimized and full-featured: Generalized Linear Modeling, including Logistic Binomial Regression plus Gaussian, Gamma, Poisson, and Tweedie distributions; Deep LearningRandom Forest (that scales *out* to all the data in the cluster);  Gradient Boosted Machine that provides both multinomial classification and regression boosted models (again, in-parallel and fully distributed); PCAKMeans (and variants);  and a full suite of data characterization – for example Quantiles (any quantile, computed exactly in milliseconds).  All these algorithms support Confusion Matrices (with adjustable thresholds), AUC & ROC metrics, incremental test data-set results on partially trained models during the build process.  Within each algorithm, we support a full range of tuning parameters and options that you’d find in the similar R or SAS package.

* A note on some Mahout algorithms: We’re clearly well suited to e.g. SSVD and Co-occurrence and have talked with Ted Dunning at length on how they would be implemented in H2O.

 REST/ JSON/ R/ python/ Excel/ REPL: The system is externally drivable via URLs/REST API calls, with JSON responses.  We use REST/JSON from Python to drive all our testing harness.  We have a very nice R package with H2O integrated behind R – you can issue R commands to an H2O-backed R “data.frame” – and have all the Big Math work on the Big Data in a cluster – including 90% of the typical “data munging” workflow.  This same REST/JSON interface also works with e.g. Excel (yes we have a demo) or shell scripts.  We have a pretty web-GUI over the REST/JSON layer, that is suitable for lightweight modeling tasks.

Cliff