Skip to content

Commit 9767712

Browse files
committed
WIP: solved partitioned and None is not recognized
1 parent 4f2d7e6 commit 9767712

File tree

4 files changed

+49
-11
lines changed

4 files changed

+49
-11
lines changed

python/pyspark/streaming/context.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,18 +154,36 @@ def _testInputStream(self, test_inputs, numSlices=None):
154154

155155
# Make sure we distribute data evenly if it's smaller than self.batchSize
156156
if "__len__" not in dir(test_input):
157-
c = list(test_input) # Make it a list so we can compute its length
157+
test_input = list(test_input) # Make it a list so we can compute its length
158158
batchSize = min(len(test_input) // numSlices, self._sc._batchSize)
159159
if batchSize > 1:
160160
serializer = BatchedSerializer(self._sc._unbatched_serializer,
161161
batchSize)
162162
else:
163163
serializer = self._sc._unbatched_serializer
164164
serializer.dump_stream(test_input, tempFile)
165+
tempFile.close()
165166
tempFiles.append(tempFile.name)
166167

167168
jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client)
168169
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
169170
jtempFiles,
170171
numSlices).asJavaDStream()
171172
return DStream(jinput_stream, self, PickleSerializer())
173+
174+
175+
def _testInputStream2(self, test_inputs, numSlices=None):
176+
"""
177+
This is inpired by QueStream implementation. Give list of RDD and generate DStream
178+
which contain the RDD.
179+
"""
180+
test_rdds = list()
181+
for test_input in test_inputs:
182+
test_rdd = self._sc.parallelize(test_input, numSlices)
183+
print test_rdd.glom().collect()
184+
test_rdds.append(test_rdd._jrdd)
185+
186+
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
187+
jinput_stream = self._jvm.PythonTestInputStream2(self._jssc, jtest_rdds).asJavaDStream()
188+
189+
return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))

python/pyspark/streaming/dstream.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ def takeAndPrint(rdd, time):
233233
taken = rdd.take(11)
234234
print "-------------------------------------------"
235235
print "Time: %s" % (str(time))
236+
print rdd.glom().collect()
237+
print "-------------------------------------------"
236238
print "-------------------------------------------"
237239
for record in taken[:10]:
238240
print record
@@ -288,6 +290,20 @@ def get_output(rdd, time):
288290
self.foreachRDD(get_output)
289291

290292

293+
# TODO: implement groupByKey
294+
# TODO: impelment union
295+
# TODO: implement cache
296+
# TODO: implement persist
297+
# TODO: implement repertitions
298+
# TODO: implement saveAsTextFile
299+
# TODO: implement cogroup
300+
# TODO: implement join
301+
# TODO: implement countByValue
302+
# TODO: implement leftOuterJoin
303+
# TODO: implemtnt rightOuterJoin
304+
305+
306+
291307
class PipelinedDStream(DStream):
292308
def __init__(self, prev, func, preservesPartitioning=False):
293309
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

python/pyspark/streaming_tests.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
7171
"""
7272
def setUp(self):
7373
PySparkStreamingTestCase.setUp(self)
74-
StreamOutput.result = list()
7574
self.timeout = 10 # seconds
75+
self.numInputPartitions = 2
76+
self.result = list()
7677

7778
def tearDown(self):
7879
PySparkStreamingTestCase.tearDown(self)
@@ -137,6 +138,8 @@ def test_reduceByKey(self):
137138
test_input = [["a", "a", "b"], ["", ""], []]
138139

139140
def test_func(dstream):
141+
print "reduceByKey"
142+
dstream.map(lambda x: (x, 1)).pyprint()
140143
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
141144
expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
142145
output = self._run_stream(test_input, test_func, expected_output)
@@ -168,9 +171,8 @@ def test_glom(self):
168171
numSlices = 2
169172

170173
def test_func(dstream):
171-
dstream.pyprint()
172174
return dstream.glom()
173-
expected_output = [[[1,2], [3,4]],[[5,6], [7,8]],[[9,10], [11,12]]]
175+
expected_output = [[[1,2], [3,4]], [[5,6], [7,8]], [[9,10], [11,12]]]
174176
output = self._run_stream(test_input, test_func, expected_output, numSlices)
175177
self.assertEqual(expected_output, output)
176178

@@ -180,20 +182,21 @@ def test_mapPartitions(self):
180182
numSlices = 2
181183

182184
def test_func(dstream):
183-
dstream.pyprint()
184-
return dstream.mapPartitions(lambda x: reduce(operator.add, x))
185-
expected_output = [[3, 7],[11, 15],[19, 23]]
185+
def f(iterator): yield sum(iterator)
186+
return dstream.mapPartitions(f)
187+
expected_output = [[3, 7], [11, 15], [19, 23]]
186188
output = self._run_stream(test_input, test_func, expected_output, numSlices)
187189
self.assertEqual(expected_output, output)
188190

189191
def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
190192
"""Start stream and return the output"""
191193
# Generate input stream with user-defined input
192-
test_input_stream = self.ssc._testInputStream(test_input, numSlices)
194+
numSlices = numSlices or self.numInputPartitions
195+
test_input_stream = self.ssc._testInputStream2(test_input, numSlices)
193196
# Apply test function to stream
194197
test_stream = test_func(test_input_stream)
195198
# Add job to get output from stream
196-
test_stream._test_output(StreamOutput.result)
199+
test_stream._test_output(self.result)
197200
self.ssc.start()
198201

199202
start_time = time.time()
@@ -205,9 +208,9 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
205208
break
206209
self.ssc.awaitTermination(50)
207210
# check if the output is the same length of expexted output
208-
if len(expected_output) == len(StreamOutput.result):
211+
if len(expected_output) == len(self.result):
209212
break
210-
return StreamOutput.result
213+
return self.result
211214

212215
if __name__ == "__main__":
213216
unittest.main()

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,4 @@ class PythonTransformedDStream(
133133
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
134134
}
135135
*/
136+

0 commit comments

Comments
 (0)