-
Notifications
You must be signed in to change notification settings - Fork 266
First pass at a spark platform for summingbird #502
Changes from all commits
1bd6bd7
a696d8e
8b98c88
b473390
73807da
1d3096b
58ddd06
c5345f5
59da45f
37ddac9
eb026a7
0ead585
fa29b74
88522f5
2769dba
2e028dd
71ca542
56d5050
15a69ac
824047c
831c3fa
887e155
a4ddcad
8250cd2
896022e
7f09e17
15d213d
c8df922
b5571a0
3217e5a
3de4cd3
ea2ab8b
2e97b73
9e97e78
04fbaf4
2a74e9a
20726e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
language: scala | ||
scala: | ||
- 2.9.3 | ||
- 2.10.3 | ||
- 2.10.4 | ||
script: | ||
- "sbt -Dlog4j.configuration=file://$TRAVIS_BUILD_DIR/project/travis-log4j.properties ++$TRAVIS_SCALA_VERSION test" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
First pass at a spark platform for summingbird. | ||
This is a work in progress, and is not really expected to work yet! | ||
|
||
Outstanding issues: | ||
- Time based batching / BatchedStore equivalent not yet supported. Time is plubmed through everywhere and used for non commutative semigroups, but | ||
batching doesn't really come into play yet. | ||
- Logic for finding the maximal covered timespan by various sources / stores is not yet implemented | ||
- Logic for lookups / leftjoins repsecting time (eg returning what a value was for a given key at a given time) is not yet implemented | ||
- Decide whether PlatformPlanner is useful or not | ||
- Test running real jobs on a real cluster (only tested once with wordcount) | ||
- SparkPlatform is stateful but shouldn't be | ||
- SparkPlatform submits blocking spark 'actions' in serial, should make (careful) use of a FuturePool instead, though we may run into spark | ||
thread-safety issues there. I actually had this implemented but didn't want to spend the time finding out whether spark is threadsafe in | ||
this way until I had something that worked in local mode first. | ||
- Writing the tests (SparkLaws) lead me to think that there should really be a platform agnostic set of laws, or at least a PlatformLaws base class. | ||
There's a lot of overlap with the scalding platform in these tests. Having a PlatformLaws test suite would also better describe the contract and expected | ||
behavior of all platforms. | ||
- There are currently no optimizations in the planning stage (except for commutativity not imposing a sort and using reduceByKey) | ||
- SparkPlatform is currently stateful but should be refactored to not be | ||
- The core spark types that need access to a SparkContext should be refactored in terms of a Reader monad |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package com.twitter.summingbird.spark | ||
|
||
import com.twitter.algebird.{Interval, Semigroup} | ||
import com.twitter.summingbird.batch.Timestamp | ||
import com.twitter.summingbird.option.{NonCommutative, Commutative, Commutativity} | ||
import org.apache.spark.SparkContext | ||
import org.apache.spark.SparkContext._ | ||
import org.apache.spark.rdd.RDD | ||
import scala.reflect.ClassTag | ||
import com.twitter.chill.Externalizer | ||
|
||
// TODO: add the logic for seeing what timespans each source / store / service can cover | ||
|
||
// A source promises to never return values outside of timeSpan | ||
trait SparkSource[T] extends Serializable { | ||
def rdd(sc: SparkContext, timeSpan: Interval[Timestamp]): RDD[(Timestamp, T)] | ||
} | ||
|
||
trait SparkSink[T] extends Serializable { | ||
def write(sc: SparkContext, rdd: RDD[(Timestamp, T)], timeSpan: Interval[Timestamp]): Unit | ||
} | ||
|
||
case class MergeResult[K, V]( | ||
sumBeforeMerge: RDD[(Timestamp, (K, (Option[V], V)))], | ||
writeClosure: () => Unit | ||
) | ||
|
||
trait SparkStore[K, V] extends Serializable { | ||
|
||
def merge(sc: SparkContext, | ||
timeSpan: Interval[Timestamp], | ||
deltas: RDD[(Timestamp, (K, V))], | ||
commutativity: Commutativity, | ||
semigroup: Semigroup[V]): MergeResult[K, V] | ||
} | ||
|
||
abstract class SimpleSparkStore[K: ClassTag, V: ClassTag] extends SparkStore[K, V] { | ||
|
||
// contract is: | ||
// no duplicate keys | ||
// provides a view of the world as of exactly the last instant of timeSpan | ||
// TODO: replace with batched logic (combine snapshots with deltas etc) | ||
def snapshot(sc: SparkContext, timeSpan: Interval[Timestamp]): RDD[(K, V)] | ||
|
||
def write(sc: SparkContext, updatedSnapshot: RDD[(K, V)]): Unit | ||
|
||
override def merge(sc: SparkContext, | ||
timeSpan: Interval[Timestamp], | ||
deltas: RDD[(Timestamp, (K, V))], | ||
commutativity: Commutativity, | ||
@transient semigroup: Semigroup[V]): MergeResult[K, V] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Semigroup extends serializable, why does it need to be transient? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it doesn't need to be in that case. I was just trying to force as much as through the externalize as possible. I wasn't sure which things need this vs which don't. Maybe only closures need this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. going to keep it this way just to be defensive. |
||
|
||
val snapshotRdd = snapshot(sc, timeSpan) | ||
val extSemigroup = Externalizer(semigroup) | ||
|
||
val summedDeltas: RDD[(K, (Timestamp, V))] = commutativity match { | ||
case Commutative => { | ||
val keyedDeltas = deltas.map { case (ts, (k, v)) => (k, (ts, v)) } | ||
|
||
keyedDeltas.reduceByKey { case ((highestTs, sumSoFar), (ts, v)) => | ||
(Ordering[Timestamp].max(highestTs, ts), extSemigroup.get.plus(sumSoFar, v)) | ||
} | ||
|
||
} | ||
|
||
case NonCommutative => { | ||
val keyedDeltas = deltas.map { case (ts, (k, v)) => (k, (ts, v)) } | ||
|
||
// TODO: how to get a sorted group w/o bringing the entire group into memory? | ||
// *does* this even bring it into memory? | ||
val grouped = keyedDeltas.groupByKey() | ||
grouped.map { | ||
case (k, vals) => { | ||
val sortedVals = vals.sortBy(_._1) // sort by time | ||
val maxTs = sortedVals.last._1 | ||
val projectedSortedVals = sortedVals.iterator.map(_._2) // just the values | ||
// projectedSortedVals should never be empty so .get is safe | ||
(k, (maxTs, extSemigroup.get.sumOption(projectedSortedVals).get)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// TODO: these 'seqs that are actually options' are ugly | ||
val grouped = summedDeltas.cogroup(snapshotRdd).map { case (k, (summedDeltaSeq, storedValueSeq)) => | ||
(k, storedValueSeq.headOption, summedDeltaSeq.headOption) | ||
} | ||
|
||
val sumBeforeMerge = grouped.flatMap { case (k, storedValue, summedDelta) => | ||
summedDelta.map { case (ts, d) => (ts, (k, (storedValue, d))) } | ||
} | ||
|
||
val updatedSnapshot = grouped.map { case (k, storedValue, summedDelta) => | ||
val v = (summedDelta.map(_._2), storedValue) match { | ||
case (Some(delta), Some(sv)) => extSemigroup.get.plus(sv, delta) | ||
case (Some(delta), None) => delta | ||
case (None, Some(sv)) => sv | ||
case _ => sys.error("This should never happen, both summedDelta and storedValue were None") | ||
} | ||
(k, v) | ||
} | ||
|
||
val writeClosure = () => write(sc, updatedSnapshot) | ||
|
||
MergeResult(sumBeforeMerge, writeClosure) | ||
} | ||
} | ||
|
||
// TODO: Need to implement the logic for time based lookups (finding what a value was for a given key at a given time) | ||
trait SparkService[K, LV] extends Serializable { | ||
def lookup[V](sc: SparkContext, | ||
timeSpan: Interval[Timestamp], | ||
rdd: RDD[(Timestamp, (K, V))]): RDD[(Timestamp, (K, (V, Option[LV])))] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
package com.twitter.summingbird.spark | ||
|
||
import com.twitter.summingbird._ | ||
import com.twitter.algebird.Semigroup | ||
import scala.reflect.ClassTag | ||
|
||
/** | ||
* The DAG handed to a platform by summingbird is a little hard to work with | ||
* given that all the generic types have been erased. This helper gives you | ||
* type safe methods to implement, and casts / coerces things for you. | ||
* | ||
* TODO: Is this useful outside of the spark platform? Is it useful at all? | ||
* | ||
* @author Alex Levenson | ||
*/ | ||
trait PlatformPlanner[P <: Platform[P]] { | ||
|
||
// some shorthand type aliases | ||
type Prod[T] = Producer[P, T] | ||
type TailProd[T] = TailProducer[P, T] | ||
type Visited = Map[Prod[_], P#Plan[_]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens if you try to do
or
do either work? would clean up the need for casting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, the T in Plan[T] isn't fixed. This is a Map of many different kinds of plans unfortunately. The Plan is basically an RDD, so if you have an RDD[T] and then you map it to an RDD[U] you now have a Plan[U] |
||
|
||
case class PlanState[T]( | ||
plan: P#Plan[T], | ||
visited: Visited | ||
) | ||
|
||
def visit[T](producer: Prod[T]) = toPlan(producer, Map.empty) | ||
|
||
protected def toPlan[T](producer: Prod[T], visited: Visited): PlanState[T] = { | ||
// short circuit if we've already visited this node | ||
visited.get(producer) match { | ||
case Some(s) => PlanState(s.asInstanceOf[P#Plan[T]], visited) | ||
case None => toPlanNoCycleCheck(producer, visited) | ||
} | ||
} | ||
|
||
// actually plans a producer, does not check for cycles (toPlan does that) | ||
private def toPlanNoCycleCheck[T](outer: Prod[T], visited: Visited): PlanState[T] = { | ||
|
||
val updatedState = outer match { | ||
case NamedProducer(prod, name) => planNamedProducer(prod, name, visited) | ||
|
||
case IdentityKeyedProducer(prod) => planIdentityKeyedProducer(prod, visited) | ||
|
||
case Source(source) => planSource(source, visited) | ||
|
||
case OptionMappedProducer(prod, fn) => planOptionMappedProducer[Any, Any](prod, visited, fn) | ||
|
||
case FlatMappedProducer(prod, fn) => planFlatMappedProducer[Any, Any](prod, visited, fn) | ||
|
||
case MergedProducer(l, r) => planMergedProducer(l, r, visited) | ||
|
||
case KeyFlatMappedProducer(prod, fn) => planKeyFlatMappedProducer(prod, visited, fn) | ||
|
||
case AlsoProducer(ensure, result) => planAlsoProducer[Any, Any](ensure, result, visited) | ||
|
||
case WrittenProducer(prod, sink) => planWrittenProducer(prod, visited, sink) | ||
|
||
case LeftJoinedProducer(prod, service) => planLeftJoinedProducer(prod, visited, service) | ||
|
||
case summer@Summer(prod, store, semigroup) => planSummer(summer, prod, visited, store, semigroup) | ||
} | ||
|
||
PlanState(updatedState.plan.asInstanceOf[P#Plan[T]], updatedState.visited + (outer -> updatedState.plan)) | ||
} | ||
|
||
def planNamedProducer[T](prod: Prod[T], name: String, visited: Visited): PlanState[T] | ||
|
||
def planIdentityKeyedProducer[K, V](prod: Prod[(K, V)], visited: Visited): PlanState[(K, V)] | ||
|
||
def planSource[T](source: P#Source[T], visited: Visited): PlanState[T] | ||
|
||
def planOptionMappedProducer[T, U: ClassTag](prod: Prod[T], visited: Visited, fn: (T) => Option[U]): PlanState[U] | ||
|
||
def planFlatMappedProducer[T, U: ClassTag](prod: Prod[T], visited: Visited, fn: (T) => TraversableOnce[U]): PlanState[U] | ||
|
||
def planMergedProducer[T](l: Prod[T], r: Prod[T], visited: Visited): PlanState[T] | ||
|
||
def planKeyFlatMappedProducer[K, V, K2](prod: Prod[(K, V)], visited: Visited, fn: K => TraversableOnce[K2]): PlanState[(K2, V)] | ||
|
||
def planAlsoProducer[E, R: ClassTag](ensure: TailProd[E], result: Prod[R], visited: Visited): PlanState[R] | ||
|
||
def planWrittenProducer[T: ClassTag](prod: Prod[T], visited: Visited, sink: P#Sink[T]): PlanState[T] | ||
|
||
def planLeftJoinedProducer[K: ClassTag, V: ClassTag, JoinedV](prod: Prod[(K, V)], visited: Visited, service: P#Service[K, JoinedV]): PlanState[(K, (V, Option[JoinedV]))] | ||
|
||
def planSummer[K: ClassTag, V: ClassTag](summer: Summer[P, K, V], prod: Prod[(K, V)], visited: Visited, store: P#Store[K, V], semigroup: Semigroup[V]): PlanState[(K, (Option[V], V))] | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implicit SparkContext; reader monad; or the store can provide. seems like it doesn't need to be an argument?