Skip to content

Commit 4bcb318

Browse files
Ken Takagiwagiwa
Ken Takagiwa
authored andcommitted
implementing transform function in Python
1 parent 38adf95 commit 4bcb318

File tree

3 files changed

+38
-1
lines changed

3 files changed

+38
-1
lines changed

python/pyspark/streaming/dstream.py

-1
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,6 @@ def saveAsTextFiles(self, prefix, suffix=None):
379379
"""
380380
Save this DStream as a text file, using string representations of elements.
381381
"""
382-
383382
def saveAsTextFile(rdd, time):
384383
path = rddToFileName(prefix, suffix, time)
385384
rdd.saveAsTextFile(path)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.apache.spark.streaming.api.python
2+
3+
import org.apache.spark.Accumulator
4+
import org.apache.spark.api.python.PythonRDD
5+
import org.apache.spark.broadcast.Broadcast
6+
import org.apache.spark.rdd.RDD
7+
import org.apache.spark.streaming.api.java.JavaDStream
8+
import org.apache.spark.streaming.{Time, Duration}
9+
import org.apache.spark.streaming.dstream.DStream
10+
11+
import scala.reflect.ClassTag
12+
13+
/**
14+
* Created by ken on 7/15/14.
15+
*/
16+
class PythonTransformedDStream[T: ClassTag](
17+
parents: Seq[DStream[T]],
18+
command: Array[Byte],
19+
envVars: JMap[String, String],
20+
pythonIncludes: JList[String],
21+
preservePartitoning: Boolean,
22+
pythonExec: String,
23+
broadcastVars: JList[Broadcast[Array[Byte]]],
24+
accumulator: Accumulator[JList[Array[Byte]]]
25+
) extends DStream[Array[Byte]](parent.ssc) {
26+
27+
override def dependencies = List(parent)
28+
29+
override def slideDuration: Duration = parent.slideDuration
30+
31+
//pythonDStream compute
32+
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
33+
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
34+
Some()
35+
}
36+
val asJavaDStream = JavaDStream.fromDStream(this)
37+
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

+1
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,7 @@ abstract class DStream[T: ClassTag] (
563563
val cleanedF = context.sparkContext.clean(transformFunc, false)
564564
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
565565
assert(rdds.length == 1)
566+
// if transformfunc is fine, it is okay
566567
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
567568
}
568569
new TransformedDStream[U](Seq(this), realTransformFunc)

0 commit comments

Comments
 (0)