Skip to content

Commit 13fb44c

Browse files
committed
basic function test cases are passed
1 parent 3000b2b commit 13fb44c

File tree

2 files changed

+159
-60
lines changed

2 files changed

+159
-60
lines changed

python/pyspark/streaming_tests.py

Lines changed: 159 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
2525
"""
2626
from itertools import chain
27-
import os
2827
import time
2928
import unittest
3029
import operator
@@ -34,9 +33,6 @@
3433
from pyspark.streaming.duration import *
3534

3635

37-
SPARK_HOME = os.environ["SPARK_HOME"]
38-
39-
4036
class PySparkStreamingTestCase(unittest.TestCase):
4137
def setUp(self):
4238
class_name = self.__class__.__name__
@@ -49,7 +45,7 @@ def tearDown(self):
4945
self.ssc._sc.stop()
5046
# Why does it long time to terminaete StremaingContext and SparkContext?
5147
# Should we change the sleep time if this depends on machine spec?
52-
time.sleep(5)
48+
time.sleep(8)
5349

5450
@classmethod
5551
def tearDownClass(cls):
@@ -59,8 +55,17 @@ def tearDownClass(cls):
5955

6056
class TestBasicOperationsSuite(PySparkStreamingTestCase):
6157
"""
62-
Input and output of this TestBasicOperationsSuite is the equivalent to
63-
Scala TestBasicOperationsSuite.
58+
2 tests for each function for batach deserializer and unbatch deserilizer because
59+
we cannot change the deserializer after streaming process starts.
60+
Default numInputPartitions is 2.
61+
If the number of input element is over 3, that DStream use batach deserializer.
62+
If not, that DStream use unbatch deserializer.
63+
64+
Most of the operation uses UTF8 deserializer to get value from Scala.
65+
I am wondering if these test are enough or not.
66+
All tests input should have list of lists. This represents stream.
67+
Every batch interval, the first object of list are chosen to make DStream.
68+
Please see the BasicTestSuits in Scala or QueStream which is close to this implementation.
6469
"""
6570
def setUp(self):
6671
PySparkStreamingTestCase.setUp(self)
@@ -75,8 +80,8 @@ def tearDown(self):
7580
def tearDownClass(cls):
7681
PySparkStreamingTestCase.tearDownClass()
7782

78-
def test_map(self):
79-
"""Basic operation test for DStream.map"""
83+
def test_map_batch(self):
84+
"""Basic operation test for DStream.map with batch deserializer"""
8085
test_input = [range(1, 5), range(5, 9), range(9, 13)]
8186

8287
def test_func(dstream):
@@ -85,8 +90,18 @@ def test_func(dstream):
8590
output = self._run_stream(test_input, test_func, expected_output)
8691
self.assertEqual(expected_output, output)
8792

88-
def test_flatMap(self):
89-
"""Basic operation test for DStream.faltMap"""
93+
def test_map_unbatach(self):
94+
"""Basic operation test for DStream.map with unbatch deserializer"""
95+
test_input = [range(1, 4), range(4, 7), range(7, 10)]
96+
97+
def test_func(dstream):
98+
return dstream.map(lambda x: str(x))
99+
expected_output = map(lambda x: map(lambda y: str(y), x), test_input)
100+
output = self._run_stream(test_input, test_func, expected_output)
101+
self.assertEqual(expected_output, output)
102+
103+
def test_flatMap_batch(self):
104+
"""Basic operation test for DStream.faltMap with batch deserializer"""
90105
test_input = [range(1, 5), range(5, 9), range(9, 13)]
91106

92107
def test_func(dstream):
@@ -96,8 +111,19 @@ def test_func(dstream):
96111
output = self._run_stream(test_input, test_func, expected_output)
97112
self.assertEqual(expected_output, output)
98113

99-
def test_filter(self):
100-
"""Basic operation test for DStream.filter"""
114+
def test_flatMap_unbatch(self):
115+
"""Basic operation test for DStream.faltMap with unbatch deserializer"""
116+
test_input = [range(1, 4), range(4, 7), range(7, 10)]
117+
118+
def test_func(dstream):
119+
return dstream.flatMap(lambda x: (x, x * 2))
120+
expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))),
121+
test_input)
122+
output = self._run_stream(test_input, test_func, expected_output)
123+
self.assertEqual(expected_output, output)
124+
125+
def test_filter_batch(self):
126+
"""Basic operation test for DStream.filter with batch deserializer"""
101127
test_input = [range(1, 5), range(5, 9), range(9, 13)]
102128

103129
def test_func(dstream):
@@ -106,21 +132,38 @@ def test_func(dstream):
106132
output = self._run_stream(test_input, test_func, expected_output)
107133
self.assertEqual(expected_output, output)
108134

109-
def test_count(self):
110-
"""Basic operation test for DStream.count"""
111-
#test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)]
112-
test_input = [range(1, 5), range(1,10), range(1,20)]
135+
def test_filter_unbatch(self):
136+
"""Basic operation test for DStream.filter with unbatch deserializer"""
137+
test_input = [range(1, 4), range(4, 7), range(7, 10)]
138+
139+
def test_func(dstream):
140+
return dstream.filter(lambda x: x % 2 == 0)
141+
expected_output = map(lambda x: filter(lambda y: y % 2 == 0, x), test_input)
142+
output = self._run_stream(test_input, test_func, expected_output)
143+
self.assertEqual(expected_output, output)
144+
145+
def test_count_batch(self):
146+
"""Basic operation test for DStream.count with batch deserializer"""
147+
test_input = [range(1, 5), range(1, 10), range(1, 20)]
113148

114149
def test_func(dstream):
115-
print "count"
116-
dstream.count().pyprint()
117150
return dstream.count()
118151
expected_output = map(lambda x: [len(x)], test_input)
119152
output = self._run_stream(test_input, test_func, expected_output)
120153
self.assertEqual(expected_output, output)
121-
122-
def test_reduce(self):
123-
"""Basic operation test for DStream.reduce"""
154+
155+
def test_count_unbatch(self):
156+
"""Basic operation test for DStream.count with unbatch deserializer"""
157+
test_input = [[], [1], range(1, 3), range(1, 4)]
158+
159+
def test_func(dstream):
160+
return dstream.count()
161+
expected_output = map(lambda x: [len(x)], test_input)
162+
output = self._run_stream(test_input, test_func, expected_output)
163+
self.assertEqual(expected_output, output)
164+
165+
def test_reduce_batch(self):
166+
"""Basic operation test for DStream.reduce with batch deserializer"""
124167
test_input = [range(1, 5), range(5, 9), range(9, 13)]
125168

126169
def test_func(dstream):
@@ -129,67 +172,132 @@ def test_func(dstream):
129172
output = self._run_stream(test_input, test_func, expected_output)
130173
self.assertEqual(expected_output, output)
131174

132-
def test_reduceByKey(self):
133-
"""Basic operation test for DStream.reduceByKey"""
134-
#test_input = [["a", "a", "b"], ["", ""], []]
135-
test_input = [["a", "a", "b", "b"], ["", "", "", ""], []]
175+
def test_reduce_unbatch(self):
176+
"""Basic operation test for DStream.reduce with unbatch deserializer"""
177+
test_input = [[1], range(1, 3), range(1, 4)]
178+
179+
def test_func(dstream):
180+
return dstream.reduce(operator.add)
181+
expected_output = map(lambda x: [reduce(operator.add, x)], test_input)
182+
output = self._run_stream(test_input, test_func, expected_output)
183+
self.assertEqual(expected_output, output)
184+
185+
def test_reduceByKey_batch(self):
186+
"""Basic operation test for DStream.reduceByKey with batch deserializer"""
187+
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]
188+
189+
def test_func(dstream):
190+
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
191+
expected_output = [[("a", 2), ("b", 2)], [("", 4)]]
192+
output = self._run_stream(test_input, test_func, expected_output)
193+
self.assertEqual(expected_output, output)
194+
195+
def test_reduceByKey_unbatch(self):
196+
"""Basic operation test for DStream.reduceByKey with unbatch deserilizer"""
197+
test_input = [["a", "a", "b"], ["", ""], []]
136198

137199
def test_func(dstream):
138-
print "reduceByKey"
139-
dstream.map(lambda x: (x, 1)).pyprint()
140200
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
141-
#expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
142-
expected_output = [[("a", 2), ("b", 2)], [("", 4)], []]
201+
expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
143202
output = self._run_stream(test_input, test_func, expected_output)
144203
self.assertEqual(expected_output, output)
145204

146-
def test_mapValues(self):
147-
"""Basic operation test for DStream.mapValues"""
148-
#test_input = [["a", "a", "b"], ["", ""], []]
149-
test_input = [["a", "a", "b", "b"], ["", "", "", ""], []]
205+
def test_mapValues_batch(self):
206+
"""Basic operation test for DStream.mapValues with batch deserializer"""
207+
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]
150208

151209
def test_func(dstream):
152-
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).mapValues(lambda x: x + 10)
153-
#expected_output = [[("a", 12), ("b", 11)], [("", 12)], []]
154-
expected_output = [[("a", 12), ("b", 12)], [("", 14)], []]
210+
return dstream.map(lambda x: (x, 1))\
211+
.reduceByKey(operator.add)\
212+
.mapValues(lambda x: x + 10)
213+
expected_output = [[("a", 12), ("b", 12)], [("", 14)]]
155214
output = self._run_stream(test_input, test_func, expected_output)
156215
self.assertEqual(expected_output, output)
157216

158-
def test_flatMapValues(self):
159-
"""Basic operation test for DStream.flatMapValues"""
160-
#test_input = [["a", "a", "b"], ["", ""], []]
161-
test_input = [["a", "a", "b", "b"], ["", "", "",""], []]
217+
def test_mapValues_unbatch(self):
218+
"""Basic operation test for DStream.mapValues with unbatch deserializer"""
219+
test_input = [["a", "a", "b"], ["", ""], []]
162220

163221
def test_func(dstream):
164-
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).flatMapValues(lambda x: (x, x + 10))
165-
#expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []]
166-
expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12)], [("", 4), ("", 14)], []]
222+
return dstream.map(lambda x: (x, 1))\
223+
.reduceByKey(operator.add)\
224+
.mapValues(lambda x: x + 10)
225+
expected_output = [[("a", 12), ("b", 11)], [("", 12)], []]
167226
output = self._run_stream(test_input, test_func, expected_output)
168227
self.assertEqual(expected_output, output)
169228

170-
def test_glom(self):
171-
"""Basic operation test for DStream.glom"""
229+
def test_flatMapValues_batch(self):
230+
"""Basic operation test for DStream.flatMapValues with batch deserializer"""
231+
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]
232+
233+
def test_func(dstream):
234+
return dstream.map(lambda x: (x, 1))\
235+
.reduceByKey(operator.add)\
236+
.flatMapValues(lambda x: (x, x + 10))
237+
expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12)], [("", 4), ("", 14)]]
238+
output = self._run_stream(test_input, test_func, expected_output)
239+
self.assertEqual(expected_output, output)
240+
241+
def test_flatMapValues_unbatch(self):
242+
"""Basic operation test for DStream.flatMapValues with unbatch deserializer"""
243+
test_input = [["a", "a", "b"], ["", ""], []]
244+
245+
def test_func(dstream):
246+
return dstream.map(lambda x: (x, 1))\
247+
.reduceByKey(operator.add)\
248+
.flatMapValues(lambda x: (x, x + 10))
249+
expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []]
250+
output = self._run_stream(test_input, test_func, expected_output)
251+
self.assertEqual(expected_output, output)
252+
253+
def test_glom_batch(self):
254+
"""Basic operation test for DStream.glom with batch deserializer"""
172255
test_input = [range(1, 5), range(5, 9), range(9, 13)]
173256
numSlices = 2
174257

175258
def test_func(dstream):
176259
return dstream.glom()
177-
expected_output = [[[1,2], [3,4]], [[5,6], [7,8]], [[9,10], [11,12]]]
260+
expected_output = [[[1, 2], [3, 4]], [[5, 6], [7, 8]], [[9, 10], [11, 12]]]
261+
output = self._run_stream(test_input, test_func, expected_output, numSlices)
262+
self.assertEqual(expected_output, output)
263+
264+
def test_glom_unbatach(self):
265+
"""Basic operation test for DStream.glom with unbatch deserialiser"""
266+
test_input = [range(1, 4), range(4, 7), range(7, 10)]
267+
numSlices = 2
268+
269+
def test_func(dstream):
270+
return dstream.glom()
271+
expected_output = [[[1], [2, 3]], [[4], [5, 6]], [[7], [8, 9]]]
178272
output = self._run_stream(test_input, test_func, expected_output, numSlices)
179273
self.assertEqual(expected_output, output)
180274

181-
def test_mapPartitions(self):
182-
"""Basic operation test for DStream.mapPartitions"""
275+
def test_mapPartitions_batch(self):
276+
"""Basic operation test for DStream.mapPartitions with batch deserializer"""
183277
test_input = [range(1, 5), range(5, 9), range(9, 13)]
184278
numSlices = 2
185279

186280
def test_func(dstream):
187-
def f(iterator): yield sum(iterator)
281+
def f(iterator):
282+
yield sum(iterator)
188283
return dstream.mapPartitions(f)
189284
expected_output = [[3, 7], [11, 15], [19, 23]]
190285
output = self._run_stream(test_input, test_func, expected_output, numSlices)
191286
self.assertEqual(expected_output, output)
192287

288+
def test_mapPartitions_unbatch(self):
289+
"""Basic operation test for DStream.mapPartitions with unbatch deserializer"""
290+
test_input = [range(1, 4), range(4, 7), range(7, 10)]
291+
numSlices = 2
292+
293+
def test_func(dstream):
294+
def f(iterator):
295+
yield sum(iterator)
296+
return dstream.mapPartitions(f)
297+
expected_output = [[1, 5], [4, 11], [7, 17]]
298+
output = self._run_stream(test_input, test_func, expected_output, numSlices)
299+
self.assertEqual(expected_output, output)
300+
193301
def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
194302
"""Start stream and return the output"""
195303
# Generate input stream with user-defined input
@@ -212,6 +320,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
212320
# check if the output is the same length of expexted output
213321
if len(expected_output) == len(self.result):
214322
break
323+
215324
return self.result
216325

217326
if __name__ == "__main__":

python/pyspark/worker.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,6 @@ def main(infile, outfile):
8686
(func, deserializer, serializer) = command
8787
init_time = time.time()
8888
iterator = deserializer.load_stream(infile)
89-
print "deserializer in worker: %s" % str(deserializer)
90-
iterator, walk = itertools.tee(iterator)
91-
if isinstance(walk, int):
92-
print "this is int"
93-
print walk
94-
else:
95-
try:
96-
print list(walk)
97-
except:
98-
print list(walk)
9989
serializer.dump_stream(func(split_index, iterator), outfile)
10090
except Exception:
10191
try:

0 commit comments

Comments
 (0)