Spark integration
So far you have seen a very short introduction to programming with the Glint parameter server. At its core you will
construct a client that will construct either matrices or vectors. You then use the pull
and push
methods on these
matrices or vectors to query and update the current state of the matrix. Now how does this relate to spark?
The main idea is that the BigMatrix
and BigVector
objects that you obtain from the client are serializable and
are safe to be used within Spark closures. It should be noted that operations such as push
and pull
on the
BigMatrix and BigVector objects will need access to the implicit execution context and the implicit timeout. These
objects are not serializable which can cause problems when running a closure naively.
Example: add values of an RDD to a distributed vector
In this example we will have an RDD that contains tuples of (Int, Double). The Int represents the key (or index) of a vector and the Double represents the corresponding value. We want to add these values to a distributed vector that is stored on the parameter servers. First, let's open the spark-shell and add the Glint jar dependency:
$ spark-shell --jars ./target/scala-2.10/Glint-assembly-0.1-SNAPSHOT.jar
Meanwhile, make sure the parameter servers are up and running in separate terminal windows:
$ sbt "run master"
$ sbt "run server"
$ sbt "run server"
Let's construct a client (make sure to mark it as transient so the spark-shell doesn't try to serialize it):
import glint.Client
@transient val client = Client()
Now, let's create our data of (key, value) pairs as an RDD:
val rdd = sc.parallelize(Array((0, 0.0), (1, 2.0), (2, -9.0), (3, 5.5), (4, 3.14), (5, 55.5), (6, 0.01), (7, 10.0), (8, 100.0), (9, 1000.0)))
And a distributed vector:
val vector = client.vector[Double](10)
The main code will use the constructed vector
object within a spark closure such as rdd.foreach { ... }
. There is
some additional boilerplate to deal with the execution context. This is, however, a small price to pay for the gained
flexibility and customizability of the concurrency of your code.
import scala.concurrent.ExecutionContext
rdd.foreach {
case (index, value) =>
implicit val ec = ExecutionContext.Implicits.global
vector.push(Array(index), Array(value))
}
Finally, let's verify the result by pulling the values from the parameter server:
@transient implicit val ec = ExecutionContext.Implicits.global
vector.pull((00L until 10).toArray).onSuccess {
case values => println(values.mkString(", "))
}
We should observe the expected output:
0.0, 2.0, -9.0, 5.5, 3.14, 55.5, 0.01, 10.0, 100.0, 1000.0