Skip to content

Commit 56fae45

Browse files
committed
WIP
1 parent e35e101 commit 56fae45

File tree

5 files changed

+41
-6
lines changed

5 files changed

+41
-6
lines changed

examples/src/main/python/streaming/test_oprations.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515

1616
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
1717
words = lines.flatMap(lambda line: line.split(" "))
18+
# ssc.checkpoint("checkpoint")
1819
mapped_words = words.map(lambda word: (word, 1))
1920
count = mapped_words.reduceByKey(add)
2021

2122
count.pyprint()
2223
ssc.start()
23-
# ssc.awaitTermination()
24-
ssc.stop()
24+
ssc.awaitTermination()
25+
# ssc.stop()

python/pyspark/streaming/context.py

+5
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,8 @@ def stop(self, stopSparkContext=True):
133133
finally:
134134
# Stop Callback server
135135
SparkContext._gateway.shutdown()
136+
137+
def checkpoint(self, directory):
138+
"""
139+
"""
140+
self._jssc.checkpoint(directory)

python/pyspark/streaming/dstream.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ def add_shuffle_key(split, iterator):
172172
with _JavaStackTrace(self.ctx) as st:
173173
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
174174
id(partitionFunc))
175-
jdstream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream()
175+
jdstream = self.ctx._jvm.PythonPairwiseDStream(keyed._jdstream.dstream(),
176+
partitioner).asJavaDStream()
176177
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
177178
# This is required so that id(partitionFunc) remains unique, even if
178179
# partitionFunc is a lambda:
@@ -233,6 +234,7 @@ def takeAndPrint(rdd, time):
233234
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
234235
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW
235236

237+
236238
class PipelinedDStream(DStream):
237239
def __init__(self, prev, func, preservesPartitioning=False):
238240
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

python/pyspark/streaming/utils.py

-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ class Java:
3737
implements = ['org.apache.spark.streaming.api.python.PythonRDDFunction']
3838

3939

40-
4140
def msDurationToString(ms):
4241
"""
4342
Returns a human-readable string representing a duration such as "35ms"

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

+30-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark._
2525
import org.apache.spark.rdd.RDD
2626
import org.apache.spark.api.python._
2727
import org.apache.spark.broadcast.Broadcast
28-
import org.apache.spark.streaming.{Duration, Time}
28+
import org.apache.spark.streaming.{StreamingContext, Duration, Time}
2929
import org.apache.spark.streaming.dstream._
3030
import org.apache.spark.streaming.api.java._
3131

@@ -64,7 +64,7 @@ class PythonDStream[T: ClassTag](
6464
}
6565

6666

67-
private class PairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
67+
private class PythonPairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
6868
DStream[Array[Byte]](prev.ssc){
6969
override def dependencies = List(prev)
7070

@@ -105,6 +105,7 @@ class PythonForeachDStream(
105105

106106
this.register()
107107
}
108+
108109
/*
109110
This does not work. Ignore this for now. -TD
110111
class PythonTransformedDStream(
@@ -126,3 +127,30 @@ class PythonTransformedDStream(
126127
}
127128
*/
128129

130+
/**
131+
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
132+
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
133+
* returns the i_th element at the i_th batch unde manual clock.
134+
*/
135+
class PythonTestInputStream(ssc_ : StreamingContext, filename: String, numPartitions: Int)
136+
extends InputDStream[Array[Byte]](ssc_) {
137+
138+
def start() {}
139+
140+
def stop() {}
141+
142+
def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
143+
logInfo("Computing RDD for time " + validTime)
144+
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
145+
//val selectedInput = if (index < input.size) input(index) else Seq[T]()
146+
147+
// lets us test cases where RDDs are not created
148+
//if (filename == null)
149+
// return None
150+
151+
//val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
152+
val rdd = PythonRDD.readRDDFromFile(ssc.sc, filename, numPartitions).rdd
153+
logInfo("Created RDD " + rdd.id + " with " + filename)
154+
Some(rdd)
155+
}
156+
}

0 commit comments

Comments
 (0)