Skip to content

Commit ff14070

Browse files
committed
broke something
1 parent bcdec33 commit ff14070

File tree

4 files changed

+38
-4
lines changed

4 files changed

+38
-4
lines changed

python/pyspark/streaming/context.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,21 +169,23 @@ def _testInputStream(self, test_inputs, numSlices=None):
169169
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
170170
jtempFiles,
171171
numSlices).asJavaDStream()
172-
return DStream(jinput_stream, self, PickleSerializer())
173-
172+
return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))
174173

175174
def _testInputStream2(self, test_inputs, numSlices=None):
176175
"""
177176
This is inpired by QueStream implementation. Give list of RDD and generate DStream
178177
which contain the RDD.
179178
"""
180179
test_rdds = list()
180+
test_rdd_deserializers = list()
181181
for test_input in test_inputs:
182182
test_rdd = self._sc.parallelize(test_input, numSlices)
183-
print test_rdd.glom().collect()
184183
test_rdds.append(test_rdd._jrdd)
184+
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)
185185

186186
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
187187
jinput_stream = self._jvm.PythonTestInputStream2(self._jssc, jtest_rdds).asJavaDStream()
188188

189-
return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))
189+
dstream = DStream(jinput_stream, self, test_rdd_deserializers[0])
190+
dstream._test_switch_dserializer(test_rdd_deserializers)
191+
return dstream

python/pyspark/streaming/dstream.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from collections import defaultdict
1919
from itertools import chain, ifilter, imap
20+
import time
2021
import operator
2122

2223
from pyspark.serializers import NoOpSerializer,\
@@ -302,6 +303,25 @@ def get_output(rdd, time):
302303

303304
self.foreachRDD(get_output)
304305

306+
def _test_switch_dserializer(self, serializer_que):
307+
"""
308+
Deserializer is dynamically changed based on numSlice and the number of
309+
input. This function choose deserializer. Currently this is just FIFO.
310+
"""
311+
312+
jrdd_deserializer = self._jrdd_deserializer
313+
314+
def switch(rdd, jtime):
315+
try:
316+
print serializer_que
317+
jrdd_deserializer = serializer_que.pop(0)
318+
print jrdd_deserializer
319+
except Exception as e:
320+
print e
321+
322+
self.foreachRDD(switch)
323+
324+
305325

306326
# TODO: implement groupByKey
307327
# TODO: impelment union

python/pyspark/streaming_tests.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ def test_count(self):
118118
test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)]
119119

120120
def test_func(dstream):
121+
print "count"
122+
dstream.count().pyprint()
121123
return dstream.count()
122124
expected_output = map(lambda x: [len(x)], test_input)
123125
output = self._run_stream(test_input, test_func, expected_output)

python/pyspark/worker.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,16 @@ def main(infile, outfile):
8686
(func, deserializer, serializer) = command
8787
init_time = time.time()
8888
iterator = deserializer.load_stream(infile)
89+
print "deserializer in worker: %s" % str(deserializer)
90+
iterator, walk = itertools.tee(iterator)
91+
if isinstance(walk, int):
92+
print "this is int"
93+
print walk
94+
else:
95+
try:
96+
print list(walk)
97+
except:
98+
print list(walk)
8999
serializer.dump_stream(func(split_index, iterator), outfile)
90100
except Exception:
91101
try:

0 commit comments

Comments
 (0)