32
32
import sys
33
33
34
34
if sys .version_info [:2 ] <= (2 , 6 ):
35
- import unittest2 as unittest
36
- else :
37
- import unittest
35
+ import unittest2 as unittest
36
+ else :
37
+ import unittest
38
38
39
39
from pyspark .context import SparkContext
40
40
from pyspark .streaming .context import StreamingContext
@@ -57,7 +57,7 @@ def tearDown(self):
57
57
58
58
@classmethod
59
59
def tearDownClass (cls ):
60
- # Make sure tp shutdown the callback server
60
+ # Make sure tp shutdown the callback server
61
61
SparkContext ._gateway ._shutdown_callback_server ()
62
62
63
63
@@ -71,7 +71,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
71
71
72
72
All tests input should have list of lists(3 lists are default). This list represents stream.
73
73
Every batch interval, the first object of list are chosen to make DStream.
74
- e.g The first list in the list is input of the first batch.
74
+ e.g The first list in the list is input of the first batch.
75
75
Please see the BasicTestSuits in Scala which is close to this implementation.
76
76
"""
77
77
def setUp (self ):
@@ -112,7 +112,7 @@ def test_flatMap_batch(self):
112
112
113
113
def test_func (dstream ):
114
114
return dstream .flatMap (lambda x : (x , x * 2 ))
115
- expected_output = map (lambda x : list (chain .from_iterable ((map (lambda y : [y , y * 2 ], x )))),
115
+ expected_output = map (lambda x : list (chain .from_iterable ((map (lambda y : [y , y * 2 ], x )))),
116
116
test_input )
117
117
output = self ._run_stream (test_input , test_func , expected_output )
118
118
self .assertEqual (expected_output , output )
@@ -191,12 +191,12 @@ def test_func(dstream):
191
191
def test_reduceByKey_batch (self ):
192
192
"""Basic operation test for DStream.reduceByKey with batch deserializer."""
193
193
test_input = [[("a" , 1 ), ("a" , 1 ), ("b" , 1 ), ("b" , 1 )],
194
- [("" , 1 ),("" , 1 ), ("" , 1 ), ("" , 1 )],
194
+ [("" , 1 ), ("" , 1 ), ("" , 1 ), ("" , 1 )],
195
195
[(1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )]]
196
196
197
197
def test_func (dstream ):
198
198
return dstream .reduceByKey (operator .add )
199
- expected_output = [[("a" , 2 ), ("b" , 2 )], [("" , 4 )], [(1 , 2 ), (2 , 2 ), (3 , 1 )]]
199
+ expected_output = [[("a" , 2 ), ("b" , 2 )], [("" , 4 )], [(1 , 2 ), (2 , 2 ), (3 , 1 )]]
200
200
output = self ._run_stream (test_input , test_func , expected_output )
201
201
for result in (output , expected_output ):
202
202
self ._sort_result_based_on_key (result )
@@ -216,13 +216,13 @@ def test_func(dstream):
216
216
217
217
def test_mapValues_batch (self ):
218
218
"""Basic operation test for DStream.mapValues with batch deserializer."""
219
- test_input = [[("a" , 2 ), ("b" , 2 ), ("c" , 1 ), ("d" , 1 )],
219
+ test_input = [[("a" , 2 ), ("b" , 2 ), ("c" , 1 ), ("d" , 1 )],
220
220
[("" , 4 ), (1 , 1 ), (2 , 2 ), (3 , 3 )],
221
221
[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )]]
222
222
223
223
def test_func (dstream ):
224
224
return dstream .mapValues (lambda x : x + 10 )
225
- expected_output = [[("a" , 12 ), ("b" , 12 ), ("c" , 11 ), ("d" , 11 )],
225
+ expected_output = [[("a" , 12 ), ("b" , 12 ), ("c" , 11 ), ("d" , 11 )],
226
226
[("" , 14 ), (1 , 11 ), (2 , 12 ), (3 , 13 )],
227
227
[(1 , 11 ), (2 , 11 ), (3 , 11 ), (4 , 11 )]]
228
228
output = self ._run_stream (test_input , test_func , expected_output )
@@ -250,7 +250,8 @@ def test_flatMapValues_batch(self):
250
250
251
251
def test_func (dstream ):
252
252
return dstream .flatMapValues (lambda x : (x , x + 10 ))
253
- expected_output = [[("a" , 2 ), ("a" , 12 ), ("b" , 2 ), ("b" , 12 ), ("c" , 1 ), ("c" , 11 ), ("d" , 1 ), ("d" , 11 )],
253
+ expected_output = [[("a" , 2 ), ("a" , 12 ), ("b" , 2 ), ("b" , 12 ),
254
+ ("c" , 1 ), ("c" , 11 ), ("d" , 1 ), ("d" , 11 )],
254
255
[("" , 4 ), ("" , 14 ), (1 , 1 ), (1 , 11 ), (2 , 1 ), (2 , 11 ), (3 , 1 ), (3 , 11 )],
255
256
[(1 , 1 ), (1 , 11 ), (2 , 1 ), (2 , 11 ), (3 , 1 ), (3 , 11 ), (4 , 1 ), (4 , 11 )]]
256
257
output = self ._run_stream (test_input , test_func , expected_output )
@@ -344,7 +345,7 @@ def test_func(dstream):
344
345
345
346
def test_groupByKey_batch (self ):
346
347
"""Basic operation test for DStream.groupByKey with batch deserializer."""
347
- test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )],
348
+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )],
348
349
[(1 , 1 ), (1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )],
349
350
[("a" , 1 ), ("a" , 1 ), ("b" , 1 ), ("" , 1 ), ("" , 1 ), ("" , 1 )]]
350
351
@@ -361,7 +362,7 @@ def test_func(dstream):
361
362
362
363
def test_groupByKey_unbatch (self ):
363
364
"""Basic operation test for DStream.groupByKey with unbatch deserializer."""
364
- test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 )],
365
+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 )],
365
366
[(1 , 1 ), (1 , 1 ), ("" , 1 )],
366
367
[("a" , 1 ), ("a" , 1 ), ("b" , 1 )]]
367
368
@@ -378,12 +379,13 @@ def test_func(dstream):
378
379
379
380
def test_combineByKey_batch (self ):
380
381
"""Basic operation test for DStream.combineByKey with batch deserializer."""
381
- test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )],
382
- [(1 , 1 ), (1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )],
382
+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )],
383
+ [(1 , 1 ), (1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )],
383
384
[("a" , 1 ), ("a" , 1 ), ("b" , 1 ), ("" , 1 ), ("" , 1 ), ("" , 1 )]]
384
385
385
386
def test_func (dstream ):
386
- def add (a , b ): return a + str (b )
387
+ def add (a , b ):
388
+ return a + str (b )
387
389
return dstream .combineByKey (str , add , add )
388
390
expected_output = [[(1 , "1" ), (2 , "1" ), (3 , "1" ), (4 , "1" )],
389
391
[(1 , "111" ), (2 , "11" ), (3 , "1" )],
@@ -395,10 +397,13 @@ def add(a, b): return a + str(b)
395
397
396
398
def test_combineByKey_unbatch (self ):
397
399
"""Basic operation test for DStream.combineByKey with unbatch deserializer."""
398
- test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 )], [(1 , 1 ), (1 , 1 ), ("" , 1 )], [("a" , 1 ), ("a" , 1 ), ("b" , 1 )]]
400
+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 )],
401
+ [(1 , 1 ), (1 , 1 ), ("" , 1 )],
402
+ [("a" , 1 ), ("a" , 1 ), ("b" , 1 )]]
399
403
400
404
def test_func (dstream ):
401
- def add (a , b ): return a + str (b )
405
+ def add (a , b ):
406
+ return a + str (b )
402
407
return dstream .combineByKey (str , add , add )
403
408
expected_output = [[(1 , "1" ), (2 , "1" ), (3 , "1" )],
404
409
[(1 , "11" ), ("" , "1" )],
@@ -445,7 +450,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
445
450
# Check time out.
446
451
if (current_time - start_time ) > self .timeout :
447
452
break
448
- # StreamingContext.awaitTermination is not used to wait because
453
+ # StreamingContext.awaitTermination is not used to wait because
449
454
# if py4j server is called every 50 milliseconds, it gets an error.
450
455
time .sleep (0.05 )
451
456
# Check if the output is the same length of expected output.
0 commit comments