Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

First pass at a spark platform for summingbird #502

Merged
merged 37 commits into from
May 21, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1bd6bd7
Playing around with spark
isnotinvain Mar 5, 2014
a696d8e
use futures
isnotinvain Mar 10, 2014
8b98c88
style cleanup
isnotinvain Mar 10, 2014
b473390
add question markers
isnotinvain Mar 10, 2014
73807da
stash again
isnotinvain Mar 20, 2014
1d3096b
Merge branch 'develop' into alexlevenson/spark
isnotinvain Apr 1, 2014
58ddd06
Fixup summer
isnotinvain Apr 1, 2014
c5345f5
use spark 0.9.0
isnotinvain Apr 1, 2014
59da45f
use spark 0.9.0-incubating
isnotinvain Apr 1, 2014
37ddac9
make example, use scala 2.10 only -- need to fix this
isnotinvain Apr 2, 2014
eb026a7
Add assembly
isnotinvain Apr 2, 2014
0ead585
It works!
isnotinvain Apr 5, 2014
fa29b74
Some cleanup
isnotinvain Apr 6, 2014
88522f5
ClassManifest -> ClassTag
isnotinvain Apr 6, 2014
2769dba
some cleanup
isnotinvain Apr 24, 2014
2e028dd
add test
isnotinvain Apr 30, 2014
71ca542
stash
isnotinvain May 2, 2014
56d5050
First test working
isnotinvain May 5, 2014
15a69ac
two tests working
isnotinvain May 6, 2014
824047c
fix serialization, add leftJoin test
isnotinvain May 6, 2014
831c3fa
add test for multiple summers
isnotinvain May 6, 2014
887e155
More serialization fixes, scala version fix, and lookup job test
isnotinvain May 7, 2014
a4ddcad
it works!
isnotinvain May 7, 2014
8250cd2
Merge branch 'develop' into alexlevenson/spark
isnotinvain May 7, 2014
896022e
rm unused test file
isnotinvain May 7, 2014
7f09e17
monoid -> semigroup
isnotinvain May 7, 2014
15d213d
Add tests for commutativity
isnotinvain May 8, 2014
c8df922
rm unused files
isnotinvain May 8, 2014
b5571a0
Readme and some comments
isnotinvain May 8, 2014
3217e5a
Address some comments
isnotinvain May 13, 2014
3de4cd3
some Build cleanup
isnotinvain May 15, 2014
ea2ab8b
fix sbt
isnotinvain May 16, 2014
2e97b73
try to use chill's trick in sbt
isnotinvain May 19, 2014
9e97e78
bump to scala 2.10.4 for spark tests
isnotinvain May 19, 2014
04fbaf4
update README
isnotinvain May 20, 2014
2a74e9a
Update the readme some more
isnotinvain May 20, 2014
20726e8
Rename toPlan2
isnotinvain May 20, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: scala
scala:
- 2.9.3
- 2.10.3
- 2.10.4
script:
- "sbt -Dlog4j.configuration=file://$TRAVIS_BUILD_DIR/project/travis-log4j.properties ++$TRAVIS_SCALA_VERSION test"
54 changes: 53 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package summingbird

import sbt._
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._
import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact

Expand All @@ -17,14 +19,19 @@ object SummingbirdBuild extends Build {
case version if version startsWith "2.9" => "org.specs2" %% "specs2" % "1.12.4.1" % "test"
case version if version startsWith "2.10" => "org.specs2" %% "specs2" % "1.13" % "test"
}

def isScala210x(scalaVersion: String) = scalaVersion match {
case version if version startsWith "2.9" => false
case version if version startsWith "2.10" => true
}

val extraSettings = Project.defaultSettings ++ mimaDefaultSettings

val sharedSettings = extraSettings ++ Seq(
organization := "com.twitter",
version := "0.4.2",
scalaVersion := "2.9.3",
crossScalaVersions := Seq("2.9.3", "2.10.0"),
crossScalaVersions := Seq("2.9.3", "2.10.4"),
// To support hadoop 1.x
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),

Expand Down Expand Up @@ -122,6 +129,7 @@ object SummingbirdBuild extends Build {
summingbirdStormTest,
summingbirdScalding,
summingbirdScaldingTest,
summingbirdSpark,
summingbirdBuilder,
summingbirdChill,
summingbirdExample
Expand Down Expand Up @@ -306,4 +314,48 @@ object SummingbirdBuild extends Build {
"com.twitter" %% "storehaus-memcache" % storehausVersion
)
).dependsOn(summingbirdCore, summingbirdStorm)

lazy val sparkAssemblyMergeSettings = assemblySettings :+ {
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
//case PathList("org", "w3c", xs @ _*) => MergeStrategy.first
//case "about.html" => MergeStrategy.discard
case PathList("com", "esotericsoftware", "minlog", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "commons", "beanutils", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "commons", "collections", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "jasper", xs @ _*) => MergeStrategy.first
case "log4j.properties" => MergeStrategy.concat
case x if x.endsWith(".xsd") || x.endsWith(".dtd") => MergeStrategy.first
case x => old(x)
}
}
}

val sparkDeps = Seq(
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "algebird-util" % algebirdVersion,
"com.twitter" %% "algebird-bijection" % algebirdVersion,
"com.twitter" %% "bijection-json" % bijectionVersion,
"com.twitter" %% "chill" % chillVersion,
"com.twitter" % "chill-hadoop" % chillVersion,
"com.twitter" %% "chill-bijection" % chillVersion,
"commons-lang" % "commons-lang" % "2.6",
"commons-httpclient" % "commons-httpclient" % "3.1",
"org.apache.spark" %% "spark-core" % "0.9.0-incubating" % "provided"
)

def buildSparkDeps(scalaVersion: String) = if (isScala210x(scalaVersion)) sparkDeps else Seq()

lazy val summingbirdSpark = module("spark").settings(
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",
skip in compile := !isScala210x(scalaVersion.value),
skip in test := !isScala210x(scalaVersion.value),
publishArtifact := isScala210x(scalaVersion.value),
libraryDependencies ++= buildSparkDeps(scalaVersion.value)
)
.settings(sparkAssemblyMergeSettings:_*)
.dependsOn(
summingbirdCore % "test->test;compile->compile",
summingbirdChill
)
}
1 change: 1 addition & 0 deletions project/assembly.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
20 changes: 20 additions & 0 deletions summingbird-spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
First pass at a spark platform for summingbird.
This is a work in progress, and is not really expected to work yet!

Outstanding issues:
- Time based batching / BatchedStore equivalent not yet supported. Time is plubmed through everywhere and used for non commutative semigroups, but
batching doesn't really come into play yet.
- Logic for finding the maximal covered timespan by various sources / stores is not yet implemented
- Logic for lookups / leftjoins repsecting time (eg returning what a value was for a given key at a given time) is not yet implemented
- Decide whether PlatformPlanner is useful or not
- Test running real jobs on a real cluster (only tested once with wordcount)
- SparkPlatform is stateful but shouldn't be
- SparkPlatform submits blocking spark 'actions' in serial, should make (careful) use of a FuturePool instead, though we may run into spark
thread-safety issues there. I actually had this implemented but didn't want to spend the time finding out whether spark is threadsafe in
this way until I had something that worked in local mode first.
- Writing the tests (SparkLaws) lead me to think that there should really be a platform agnostic set of laws, or at least a PlatformLaws base class.
There's a lot of overlap with the scalding platform in these tests. Having a PlatformLaws test suite would also better describe the contract and expected
behavior of all platforms.
- There are currently no optimizations in the planning stage (except for commutativity not imposing a sort and using reduceByKey)
- SparkPlatform is currently stateful but should be refactored to not be
- The core spark types that need access to a SparkContext should be refactored in terms of a Reader monad
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.twitter.summingbird.spark

import com.twitter.algebird.{Interval, Semigroup}
import com.twitter.summingbird.batch.Timestamp
import com.twitter.summingbird.option.{NonCommutative, Commutative, Commutativity}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import com.twitter.chill.Externalizer

// TODO: add the logic for seeing what timespans each source / store / service can cover

// A source promises to never return values outside of timeSpan
trait SparkSource[T] extends Serializable {
def rdd(sc: SparkContext, timeSpan: Interval[Timestamp]): RDD[(Timestamp, T)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implicit SparkContext; reader monad; or the store can provide. seems like it doesn't need to be an argument?

}

trait SparkSink[T] extends Serializable {
def write(sc: SparkContext, rdd: RDD[(Timestamp, T)], timeSpan: Interval[Timestamp]): Unit
}

case class MergeResult[K, V](
sumBeforeMerge: RDD[(Timestamp, (K, (Option[V], V)))],
writeClosure: () => Unit
)

trait SparkStore[K, V] extends Serializable {

def merge(sc: SparkContext,
timeSpan: Interval[Timestamp],
deltas: RDD[(Timestamp, (K, V))],
commutativity: Commutativity,
semigroup: Semigroup[V]): MergeResult[K, V]
}

abstract class SimpleSparkStore[K: ClassTag, V: ClassTag] extends SparkStore[K, V] {

// contract is:
// no duplicate keys
// provides a view of the world as of exactly the last instant of timeSpan
// TODO: replace with batched logic (combine snapshots with deltas etc)
def snapshot(sc: SparkContext, timeSpan: Interval[Timestamp]): RDD[(K, V)]

def write(sc: SparkContext, updatedSnapshot: RDD[(K, V)]): Unit

override def merge(sc: SparkContext,
timeSpan: Interval[Timestamp],
deltas: RDD[(Timestamp, (K, V))],
commutativity: Commutativity,
@transient semigroup: Semigroup[V]): MergeResult[K, V] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semigroup extends serializable, why does it need to be transient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it doesn't need to be in that case. I was just trying to force as much as through the externalize as possible. I wasn't sure which things need this vs which don't. Maybe only closures need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to keep it this way just to be defensive.


val snapshotRdd = snapshot(sc, timeSpan)
val extSemigroup = Externalizer(semigroup)

val summedDeltas: RDD[(K, (Timestamp, V))] = commutativity match {
case Commutative => {
val keyedDeltas = deltas.map { case (ts, (k, v)) => (k, (ts, v)) }

keyedDeltas.reduceByKey { case ((highestTs, sumSoFar), (ts, v)) =>
(Ordering[Timestamp].max(highestTs, ts), extSemigroup.get.plus(sumSoFar, v))
}

}

case NonCommutative => {
val keyedDeltas = deltas.map { case (ts, (k, v)) => (k, (ts, v)) }

// TODO: how to get a sorted group w/o bringing the entire group into memory?
// *does* this even bring it into memory?
val grouped = keyedDeltas.groupByKey()
grouped.map {
case (k, vals) => {
val sortedVals = vals.sortBy(_._1) // sort by time
val maxTs = sortedVals.last._1
val projectedSortedVals = sortedVals.iterator.map(_._2) // just the values
// projectedSortedVals should never be empty so .get is safe
(k, (maxTs, extSemigroup.get.sumOption(projectedSortedVals).get))
}
}
}
}

// TODO: these 'seqs that are actually options' are ugly
val grouped = summedDeltas.cogroup(snapshotRdd).map { case (k, (summedDeltaSeq, storedValueSeq)) =>
(k, storedValueSeq.headOption, summedDeltaSeq.headOption)
}

val sumBeforeMerge = grouped.flatMap { case (k, storedValue, summedDelta) =>
summedDelta.map { case (ts, d) => (ts, (k, (storedValue, d))) }
}

val updatedSnapshot = grouped.map { case (k, storedValue, summedDelta) =>
val v = (summedDelta.map(_._2), storedValue) match {
case (Some(delta), Some(sv)) => extSemigroup.get.plus(sv, delta)
case (Some(delta), None) => delta
case (None, Some(sv)) => sv
case _ => sys.error("This should never happen, both summedDelta and storedValue were None")
}
(k, v)
}

val writeClosure = () => write(sc, updatedSnapshot)

MergeResult(sumBeforeMerge, writeClosure)
}
}

// TODO: Need to implement the logic for time based lookups (finding what a value was for a given key at a given time)
trait SparkService[K, LV] extends Serializable {
def lookup[V](sc: SparkContext,
timeSpan: Interval[Timestamp],
rdd: RDD[(Timestamp, (K, V))]): RDD[(Timestamp, (K, (V, Option[LV])))]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.twitter.summingbird.spark

import com.twitter.summingbird._
import com.twitter.algebird.Semigroup
import scala.reflect.ClassTag

/**
* The DAG handed to a platform by summingbird is a little hard to work with
* given that all the generic types have been erased. This helper gives you
* type safe methods to implement, and casts / coerces things for you.
*
* TODO: Is this useful outside of the spark platform? Is it useful at all?
*
* @author Alex Levenson
*/
trait PlatformPlanner[P <: Platform[P]] {

// some shorthand type aliases
type Prod[T] = Producer[P, T]
type TailProd[T] = TailProducer[P, T]
type Visited = Map[Prod[_], P#Plan[_]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if you try to do

type Visited = Map[Prod[_], P#Plan[T]]

or

type Visited[T] = Map[Prod[_], P#Plan[T]]

do either work? would clean up the need for casting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the T in Plan[T] isn't fixed. This is a Map of many different kinds of plans unfortunately.

The Plan is basically an RDD, so if you have an RDD[T] and then you map it to an RDD[U] you now have a Plan[U]


case class PlanState[T](
plan: P#Plan[T],
visited: Visited
)

def visit[T](producer: Prod[T]) = toPlan(producer, Map.empty)

protected def toPlan[T](producer: Prod[T], visited: Visited): PlanState[T] = {
// short circuit if we've already visited this node
visited.get(producer) match {
case Some(s) => PlanState(s.asInstanceOf[P#Plan[T]], visited)
case None => toPlanNoCycleCheck(producer, visited)
}
}

// actually plans a producer, does not check for cycles (toPlan does that)
private def toPlanNoCycleCheck[T](outer: Prod[T], visited: Visited): PlanState[T] = {

val updatedState = outer match {
case NamedProducer(prod, name) => planNamedProducer(prod, name, visited)

case IdentityKeyedProducer(prod) => planIdentityKeyedProducer(prod, visited)

case Source(source) => planSource(source, visited)

case OptionMappedProducer(prod, fn) => planOptionMappedProducer[Any, Any](prod, visited, fn)

case FlatMappedProducer(prod, fn) => planFlatMappedProducer[Any, Any](prod, visited, fn)

case MergedProducer(l, r) => planMergedProducer(l, r, visited)

case KeyFlatMappedProducer(prod, fn) => planKeyFlatMappedProducer(prod, visited, fn)

case AlsoProducer(ensure, result) => planAlsoProducer[Any, Any](ensure, result, visited)

case WrittenProducer(prod, sink) => planWrittenProducer(prod, visited, sink)

case LeftJoinedProducer(prod, service) => planLeftJoinedProducer(prod, visited, service)

case summer@Summer(prod, store, semigroup) => planSummer(summer, prod, visited, store, semigroup)
}

PlanState(updatedState.plan.asInstanceOf[P#Plan[T]], updatedState.visited + (outer -> updatedState.plan))
}

def planNamedProducer[T](prod: Prod[T], name: String, visited: Visited): PlanState[T]

def planIdentityKeyedProducer[K, V](prod: Prod[(K, V)], visited: Visited): PlanState[(K, V)]

def planSource[T](source: P#Source[T], visited: Visited): PlanState[T]

def planOptionMappedProducer[T, U: ClassTag](prod: Prod[T], visited: Visited, fn: (T) => Option[U]): PlanState[U]

def planFlatMappedProducer[T, U: ClassTag](prod: Prod[T], visited: Visited, fn: (T) => TraversableOnce[U]): PlanState[U]

def planMergedProducer[T](l: Prod[T], r: Prod[T], visited: Visited): PlanState[T]

def planKeyFlatMappedProducer[K, V, K2](prod: Prod[(K, V)], visited: Visited, fn: K => TraversableOnce[K2]): PlanState[(K2, V)]

def planAlsoProducer[E, R: ClassTag](ensure: TailProd[E], result: Prod[R], visited: Visited): PlanState[R]

def planWrittenProducer[T: ClassTag](prod: Prod[T], visited: Visited, sink: P#Sink[T]): PlanState[T]

def planLeftJoinedProducer[K: ClassTag, V: ClassTag, JoinedV](prod: Prod[(K, V)], visited: Visited, service: P#Service[K, JoinedV]): PlanState[(K, (V, Option[JoinedV]))]

def planSummer[K: ClassTag, V: ClassTag](summer: Summer[P, K, V], prod: Prod[(K, V)], visited: Visited, store: P#Store[K, V], semigroup: Semigroup[V]): PlanState[(K, (Option[V], V))]

}
Loading