Skip to content

Commit 1523b66

Browse files
committed
WIP
1 parent 8a0fbbc commit 1523b66

File tree

5 files changed

+14
-6
lines changed

5 files changed

+14
-6
lines changed

examples/src/main/python/streaming/test_oprations.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515

1616
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
1717
words = lines.flatMap(lambda line: line.split(" "))
18+
# ssc.checkpoint("checkpoint")
1819
mapped_words = words.map(lambda word: (word, 1))
1920
count = mapped_words.reduceByKey(add)
2021

2122
count.pyprint()
2223
ssc.start()
23-
# ssc.awaitTermination()
24-
ssc.stop()
24+
ssc.awaitTermination()
25+
# ssc.stop()

python/pyspark/streaming/context.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,8 @@ def stop(self, stopSparkContext=True):
133133
finally:
134134
# Stop Callback server
135135
SparkContext._gateway.shutdown()
136+
137+
def checkpoint(self, directory):
138+
"""
139+
"""
140+
self._jssc.checkpoint(directory)

python/pyspark/streaming/dstream.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ def add_shuffle_key(split, iterator):
172172
with _JavaStackTrace(self.ctx) as st:
173173
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
174174
id(partitionFunc))
175-
jdstream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream()
175+
jdstream = self.ctx._jvm.PythonPairwiseDStream(keyed._jdstream.dstream(),
176+
partitioner).asJavaDStream()
176177
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
177178
# This is required so that id(partitionFunc) remains unique, even if
178179
# partitionFunc is a lambda:
@@ -246,6 +247,7 @@ def takeAndPrint(rdd, time):
246247
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
247248
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW
248249

250+
249251
class PipelinedDStream(DStream):
250252
def __init__(self, prev, func, preservesPartitioning=False):
251253
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

python/pyspark/streaming/utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ class Java:
3737
implements = ['org.apache.spark.streaming.api.python.PythonRDDFunction']
3838

3939

40-
4140
def msDurationToString(ms):
4241
"""
4342
Returns a human-readable string representing a duration such as "35ms"

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark._
2525
import org.apache.spark.rdd.RDD
2626
import org.apache.spark.api.python._
2727
import org.apache.spark.broadcast.Broadcast
28-
import org.apache.spark.streaming.{Duration, Time}
28+
import org.apache.spark.streaming.{StreamingContext, Duration, Time}
2929
import org.apache.spark.streaming.dstream._
3030
import org.apache.spark.streaming.api.java._
3131

@@ -139,7 +139,7 @@ DStream[(Long, Array[Byte])](prev.ssc){
139139
}
140140

141141

142-
private class PairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
142+
private class PythonPairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
143143
DStream[Array[Byte]](prev.ssc){
144144
override def dependencies = List(prev)
145145

@@ -180,6 +180,7 @@ class PythonForeachDStream(
180180

181181
this.register()
182182
}
183+
183184
/*
184185
This does not work. Ignore this for now. -TD
185186
class PythonTransformedDStream(

0 commit comments

Comments
 (0)