Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

First pass at a spark platform for summingbird #502

Merged
merged 37 commits into from
May 21, 2014
Merged

First pass at a spark platform for summingbird #502

merged 37 commits into from
May 21, 2014

Conversation

isnotinvain
Copy link
Contributor

First pass at a spark platform for summingbird.

Outstanding issues:

  • Time based batching / BatchedStore equivalent not yet supported. Time is plubmed through everywhere and used for non commutative semigroups, but
    batching doesn't really come into play yet.
  • Logic for finding the maximal covered timespan by various sources / stores is not yet implemented
  • Logic for lookups / leftjoins repsecting time (eg returning what a value was for a given key at a given time) is not yet implemented
  • Decide whether PlatformPlanner is useful or not
  • Test running real jobs on a real cluster (only tested once with wordcount)
  • SparkPlatform is stateful but shouldn't be
  • SparkPlatform submits blocking spark 'actions' in serial, should make (careful) use of a FuturePool instead, though we may run into spark
    thread-safety issues there. I actually had this implemented but didn't want to spend the time finding out whether spark is threadsafe in
    this way until I had something that worked in local mode first.
  • Writing the tests (SparkLaws) lead me to think that there should really be a platform agnostic set of laws, or at least a PlatformLaws base class.
    There's a lot of overlap with the scalding platform in these tests. Having a PlatformLaws test suite would also better describe the contract and expected
    behavior of all platforms.
  • There are currently no optimizations in the planning stage (except for commutativity not imposing a sort and using reduceByKey)
  • SparkPlatform is currently stateful but should be refactored to not be
  • The core spark types that need access to a SparkContext should be refactored in terms of a Reader monad

timeSpan: Interval[Timestamp],
deltas: RDD[(Timestamp, (K, V))],
commutativity: Commutativity,
@transient semigroup: Semigroup[V]): MergeResult[K, V] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semigroup extends serializable, why does it need to be transient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it doesn't need to be in that case. I was just trying to force as much as through the externalize as possible. I wasn't sure which things need this vs which don't. Maybe only closures need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to keep it this way just to be defensive.


// A source promises to never return values outside of timeSpan
trait SparkSource[T] extends Serializable {
def rdd(sc: SparkContext, timeSpan: Interval[Timestamp]): RDD[(Timestamp, T)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implicit SparkContext; reader monad; or the store can provide. seems like it doesn't need to be an argument?

@isnotinvain isnotinvain changed the title First pass at a spark platform for summingbird -- DO NOT MERGE! First pass at a spark platform for summingbird May 20, 2014
// some shorthand type aliases
type Prod[T] = Producer[P, T]
type TailProd[T] = TailProducer[P, T]
type Visited = Map[Prod[_], P#Plan[_]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if you try to do

type Visited = Map[Prod[_], P#Plan[T]]

or

type Visited[T] = Map[Prod[_], P#Plan[T]]

do either work? would clean up the need for casting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the T in Plan[T] isn't fixed. This is a Map of many different kinds of plans unfortunately.

The Plan is basically an RDD, so if you have an RDD[T] and then you map it to an RDD[U] you now have a Plan[U]

jcoveney added a commit that referenced this pull request May 21, 2014
First pass at a spark platform for summingbird
@jcoveney jcoveney merged commit d4a7fb0 into twitter:develop May 21, 2014
@jcoveney
Copy link
Contributor

Great work Alex. Now add those issues and let's keep summing.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants