Skip to content

Commit

Permalink
Merge pull request alteryx#102 from tdas/transform
Browse files Browse the repository at this point in the history
Added new Spark Streaming operations

New operations
- transformWith which allows arbitrary 2-to-1 DStream transform, added to Scala and Java API
- StreamingContext.transform to allow arbitrary n-to-1 DStream
- leftOuterJoin and rightOuterJoin between 2 DStreams, added to Scala and Java API
- missing variations of join and cogroup added to Scala Java API
- missing JavaStreamingContext.union

Updated a number of Java and Scala API docs
  • Loading branch information
mateiz committed Oct 26, 2013
2 parents 85e2cab + dc95707 commit d307db6
Show file tree
Hide file tree
Showing 13 changed files with 1,037 additions and 162 deletions.
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -622,4 +622,15 @@ object JavaPairRDD {
new JavaPairRDD[K, V](rdd)

implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd


/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
new JavaPairRDD[K, V](rdd.rdd)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import scala.reflect.ClassManifest;
import scala.reflect.ClassManifest$;
import scala.runtime.AbstractFunction2;

import java.io.Serializable;

/**
* A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
*/
public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R>
implements Serializable {

public ClassManifest<R> returnType() {
return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function

import scala.runtime.AbstractFunction3

/**
* Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the
* apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply
* isn't marked to allow that).
*/
private[spark] abstract class WrappedFunction3[T1, T2, T3, R]
extends AbstractFunction3[T1, T2, T3, R] {
@throws(classOf[Exception])
def call(t1: T1, t2: T2, t3: T3): R

final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3)
}

46 changes: 40 additions & 6 deletions streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -486,15 +486,15 @@ abstract class DStream[T: ClassManifest] (

/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: RDD[T] => Unit) {
this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
}

/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
Expand All @@ -504,18 +504,52 @@ abstract class DStream[T: ClassManifest] (

/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
* on each RDD of 'this' DStream.
*/
def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
transform((r: RDD[T], t: Time) => transformFunc(r))
transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
}

/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
* on each RDD of 'this' DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
new TransformedDStream(this, context.sparkContext.clean(transformFunc))
//new TransformedDStream(this, context.sparkContext.clean(transformFunc))
val cleanedF = context.sparkContext.clean(transformFunc)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
}
new TransformedDStream[U](Seq(this), realTransformFunc)
}

/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream and 'other' DStream.
*/
def transformWith[U: ClassManifest, V: ClassManifest](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V] = {
val cleanedF = ssc.sparkContext.clean(transformFunc)
transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
}

/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream and 'other' DStream.
*/
def transformWith[U: ClassManifest, V: ClassManifest](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V] = {
val cleanedF = ssc.sparkContext.clean(transformFunc)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 2)
val rdd1 = rdds(0).asInstanceOf[RDD[T]]
val rdd2 = rdds(1).asInstanceOf[RDD[U]]
cleanedF(rdd1, rdd2, time)
}
new TransformedDStream[V](Seq(this, other), realTransformFunc)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming

import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
import org.apache.spark.streaming.dstream.{ShuffledDStream}
import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}

import org.apache.spark.{Partitioner, HashPartitioner}
Expand Down Expand Up @@ -359,7 +359,7 @@ extends Serializable {
}

/**
* Create a new "state" DStream where the state for each key is updated by applying
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
Expand Down Expand Up @@ -398,78 +398,160 @@ extends Serializable {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
}


/**
* Return a new DStream by applying a map function to the value of each key-value pairs in
* 'this' DStream without changing the key.
*/
def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
new MapValuedDStream[K, V, U](self, mapValuesFunc)
}

/**
* Return a new DStream by applying a flatmap function to the value of each key-value pairs in
* 'this' DStream without changing the key.
*/
def flatMapValues[U: ClassManifest](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}

/**
* Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner())
}

/**
* Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. Partitioner is used to partition each generated RDD.
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(numPartitions))
}

/**
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
*/
def cogroup[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Seq[V], Seq[W]))] = {

val cgd = new CoGroupedDStream[K](
Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]),
partitioner
)
val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
classManifest[K],
Manifests.seqSeqManifest
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
)
pdfs.mapValues {
case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
}

/**
* Join `this` DStream with `other` DStream. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner())
}

/**
* Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner(numPartitions))
}

/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def join[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, W))] = {
this.cogroup(other, partitioner)
.flatMapValues{
case (vs, ws) =>
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
)
}

/**
* Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
leftOuterJoin[W](other, defaultPartitioner())
}

/**
* Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
*/
def leftOuterJoin[W: ClassManifest](
other: DStream[(K, W)],
numPartitions: Int
): DStream[(K, (V, Option[W]))] = {
leftOuterJoin[W](other, defaultPartitioner(numPartitions))
}

/**
* Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
def leftOuterJoin[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, Option[W]))] = {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
)
}

/**
* Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
rightOuterJoin[W](other, defaultPartitioner())
}

/**
* Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
*/
def rightOuterJoin[W: ClassManifest](
other: DStream[(K, W)],
numPartitions: Int
): DStream[(K, (Option[V], W))] = {
rightOuterJoin[W](other, defaultPartitioner(numPartitions))
}

/**
* Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
def rightOuterJoin[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Option[V], W))] = {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
)
}

/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
* is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
Expand All @@ -479,8 +561,8 @@ extends Serializable {
}

/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
* is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles(
prefix: String,
Expand Down
Loading

0 comments on commit d307db6

Please sign in to comment.