Sparkling Water: H2O + Scala + Spark

Spark is an up and coming new big data technology; it’s a whole lot faster and easier than existing Hadoop-based solutions. H2O does state-of-the-art Machine Learning algorithms over Big Data – and does them Fast. We are happy to announce that H2O now has a basic integration with Spark – Sparkling Water!

This is a “deep” integration – H2O can convert Spark RDDs to H2O DataFrames and vice-versa directly. The conversion is fully in-memory and in-process, and distributed and parallel as well. This also means H2O has a (nearly) full integration with Scala as well – H2O DataFrames are full Scala Collection objects – and the basic for each collection calls naturally run distributed and parallel also.

A few months back we announced the start of this initiative: Sparkling Water = H2O + Spark

In that post we went for the quickest integration solution possible: separate processes for Spark and H2O, with a Tachyon bridge. While this worked, it required fully 3 copies of all data: a Spark copy, a Tachyon bridge copy, and an H2O copy – and the data passed through process boundaries repeatedly. With this work we need only the Spark and H2O copies, the data is only copied once,and does not cross a process boundary. It’s both faster and uses less memory.

Scala integration is being showcased before Spark integration simply because we need to understand H2O’s Scala integration before we can understand H2O’s Spark integration.


H2O’s Scala API

As always, the code is open source and available on github. This code is all in 0xdata’s “h2o-dev” repro – a clean-slate dev friendly version of H2O. As of this writing, h2o-dev has all the core h2o logic, and about 50% of the machine learning algos (and the rest should appear in a month).

https://github.com/0xdata/h2o-dev/

The basic Scala wrappers are here:

https://github.com/0xdata/h2o-dev/tree/master/h2o-scala/src/main/scala/water/fvec/DataFrame.scala

https://github.com/0xdata/h2o-dev/tree/master/h2o-scala/src/main/scala/water/fvec/MapReduce.scala

And some simple examples here:

https://github.com/0xdata/h2o-dev/blob/master/h2o-scala/src/test/scala/water/BasicTest.scala

The idea is really simple: a Scala DataFrame extends an H2O water.fvec.Frame object.

class DataFrame private ( key : Key, names : Array[String], vecs : Array[Vec] ) extends Frame(key,names,vecs) with Map[Long,Array[Option[Any]]] {

The Scala type is a Map[Long, Array[Option[Any]]] – a map from Long row numbers to rows of data – i.e. a single data observation. The observation is presented as an Array of Option[Any] – array elements are features in the observation. H2O fully supports the notion of missing elements or data – and this is presented as a Scala Option type. Once you know a value is not“missing” – what exactly is it’s type? H2O Vecs (columns) are typed to hold one of these types:- a Number (Java double conceptually, but any of boolean, byte, char, int, float, long, double)– a String– a Factor/Enum (similar to interned Strings that are mapped to small dense integer values)– a Timestamp (stored internally as milliseconds since the Unix Epoch)– a UUID

A DataFrame can be made from a Spark RDD or an H2O Frame directly or from a CSV file:

val df1 = new DataFrame(new File("some.csv"))

Column subsets (a smaller dataframe) can be selected via the column name:

val df2 : DataFrame = df2('a_column_header,'age,'id)

You can do a basic foreach:

df2.foreach( case(x,age,id) => ...use x and age and id... )

And this will run distributed and parallel across your cluster!

The code for foreach is very simple:

override def foreach[U](f: ((Long, T)) => U): Unit = {
  new MRTask {
    override def map( chks : Array[Chunk] ) = {
      val start = chks(0)._start
      val row = new T(chks.length)
      val len = chks(0).len
      (0 until len).foreach{ i =>
        (0 until chks.length).foreach{ col => row(col) = if( chks(col).isNA0(i) ) None else Some(chks(col).at0(i)) }
        f(start+i,row)
      }
    }
  }.doAll(this)
}

More complicated examples showing e.g. a scala map/reduce paradigm will follow later.


WaterWorks – Flowing Data between Spark and H2O

Let’s turn to Spark now, and show H2O and Spark working together. Spark RDD’s and H2O Frames are similar – but not equal – ways to manage large distributed data. Basically RDDs are a collection of blocks (Partitions) of rows, and Frames are a collection of columns, aligned in rows and broken up in Chunks.Sort of a blocks-vs-stripes thing.

H2O represents data in columns (Vecs in H2O lingo) vertically striped across the cluster; a Frame is a collection of columns/Vecs. Vecs, in turn,are broken up into Chunks – and the Chunks are carefully aligned in the JVM heaps such that whole rows of data (all the columns in a Frame) align.

Spark represents data in RDDs (Resilent Distributed Dataset) -a Collection of elements. Each RDD in broken up in turn into Partitions blocked across the cluster; each Partition holds a subset of the Collection as complete rows (or observations in data-science parlance).

H2O Frames, Vecs and Chunks can represent any primitive Numeric type, Strings, timestamps, and UUIDs – and support the notion of a missing value. RDDs ca nrepresent a collection of any Scala or Java object. If we limit the objects to containing fields that H2O represents then we can build a simple mapping between the two formats.

For our example we’ll look at a collection of Users:

class User(
  id: Long,
  age: Option[Short],
  sex: Option[Boolean],
  salary: Int
)

In Scala, this is typed as RDD[User], and we’ll have 1 User object per real User, blocked in Partitions around the cluster. In H2O, DataFrames naturally hold all numeric types and missing values in a column format; we’ll have 4columns each holding a value for each user again distributed in Chunks around the cluster. Here’s a single Partition hold 4 users, and showing 4 Chunks (one per column):

part_chunk

Here’s 4 JVM heaps holding all 32 users. There are 8 partitions (two per JVM)and 32 Chunks in 4 Vecs striped around the cluster:

rdd_frame


An Example: Back and Forth between H2O and Spark

Here’s a simple Scala example; we’ll assume we have the 4-node Sparkling Water cluster above and a CSV (or Hive) file on disk. We’ll use a simple User data file featuring just a unique id, the user’s age, sex and salary. From just the age and sex we’ll try to predict the typical salary using DeepLearning.

First we load the CSV file into H2O. H2O features a fast robust CSV parser which is able to ingest CSV data at full disk bandwidths across a cluster. We can handle a wide variety of CSV formats (e.g. Hive is a CSV with ^A/0x01 field separators) and typically automatically infer all column types. Unlike Spark,operations in H2O are eager not lazy, so the file is loaded and parsed immediately:

// Load an H2O DataFrame from CSV file
val frameFromCSV = new DataFrame(new File("h2o-examples/smalldata/small_users.csv"))

Then we’ll convert to a Spark RDD – like all RDD operations, this conversion islazy; the conversion is defined but not executed yet:

val sc = createSparkContext()
val table : RDD[User] = H2OContext.toRDD(cs,frameFromCSV)

And then run an SQL query! SQL is a great way to subselect and munge data, and is one of Spark’s many strengths.

table.registerTempTable("user_table") // The RDD is now an SQL table
val query = "SELECT * FROM user_table WHERE AGE>=18" // Ignore underage users
val result = sql(query) // Run SQL query

Then we’ll convert back to an H2O DataFrame – and run H2O’s Deep Learning(NeuralNets) on the result. Since H2O operations are eager, this conversion triggers the actual SQL query to run just before converting.

// Convert back to H2O DataFrame
val frameFromQuery = H2OContext.toDataFrame(sc,result)

Now it’s time for some Deep Learning. We don’t want to train a Deep Learning model using the ID field – since it is unique per user, its possible that the model learns to spot interesting user features from the ID alone – which obviously does not work on the next new user that comes along! This problem is calledoverfittingand H2O’s algorithms all have robust features to avoid it – but in this case we’ll just cut out the id column by selecting a subset of features to train on by name:

val trainFrame = frameFromQuery('age,'sex,'salary)

Deep Learning has about 60 parameters, but here we’ll limit ourselves to setting just a few and take all the defaults. Also note that sometimes the age or sex information is missing; this is a common problem in Data Science – you can see it in the example data. DataFrames naturally support this notion of missing data. However each of the Machine Learning algorithms in H2O handle missing data differently – the exact behavior depends on the deep mathematics in the algorithms. See this video for an excellent discussion on H2O’s DeepLearning.

// Deep Learning!
// – configure parameters
val dlParams = new DeepLearningParameters()
dlParams.source = trainFrame 
dlParams.response = trainFrame(‘salary)
// Run Deep Learning. The train() call starts the learning process
// and returns a Future; the training runs in the background. Calling
// get() blocks until the training completes.
val dlModel = new DeepLearning(dlParams).train.get

At this point we have a Deep Learning model that we can use to make predictions about new users – who have not shared their salary information with us. Those predictions are another H2O DataFrame and can be moved back and forth between H2O and Spark as any other DataFrame. Spark and Scala logic can be used to drive actions from the predictions, or just used in further in a deep RDD and H2O pipeline.


Summary

In just a few lines of Scala code we now blend H2O’s Machine Learning and Spark’s data-munging and SQL to form a complete ML solution:

  • Ingest data, via CSV, existing Spark RDD or existing H2O Frame
  • Manipulate the data, via Spark RDDs or as a Scala Collection
  • Model in H2O with cutting-edge algorithms
  • Use the Model to make Predictions
  • Use the Predictions in Spark RDDs and Scala to solve our problems

In the months ahead we will be rounding out this integration story – turning brand-new functionality into a production-ready solution.

I hope you like H2O’s new Spark and Scala integration – and always we are very excited to hear your feedback – and especially code to help further this good cause!  If you would like to contribute – please download the code from Git and submit your pull requests – or post suggestions and questions on h2ostream.

Cliff