@@ -332,7 +332,7 @@ def test_groupByKey_batch(self):
332
332
"""Basic operation test for DStream.groupByKey with batch deserializer."""
333
333
test_input = [range (1 , 5 ), [1 , 1 , 1 , 2 , 2 , 3 ], ["a" , "a" , "b" , "" , "" , "" ]]
334
334
def test_func (dstream ):
335
- return dstream .map (lambda x : (x ,1 )).groupByKey ()
335
+ return dstream .map (lambda x : (x , 1 )).groupByKey ()
336
336
expected_output = [[(1 , [1 ]), (2 , [1 ]), (3 , [1 ]), (4 , [1 ])],
337
337
[(1 , [1 , 1 , 1 ]), (2 , [1 , 1 ]), (3 , [1 ])],
338
338
[("a" , [1 , 1 ]), ("b" , [1 ]), ("" , [1 , 1 , 1 ])]]
@@ -345,8 +345,9 @@ def test_func(dstream):
345
345
def test_groupByKey_unbatch (self ):
346
346
"""Basic operation test for DStream.groupByKey with unbatch deserializer."""
347
347
test_input = [range (1 , 4 ), [1 , 1 , "" ], ["a" , "a" , "b" ]]
348
+
348
349
def test_func (dstream ):
349
- return dstream .map (lambda x : (x ,1 )).groupByKey ()
350
+ return dstream .map (lambda x : (x , 1 )).groupByKey ()
350
351
expected_output = [[(1 , [1 ]), (2 , [1 ]), (3 , [1 ])],
351
352
[(1 , [1 , 1 ]), ("" , [1 ])],
352
353
[("a" , [1 , 1 ]), ("b" , [1 ])]]
@@ -356,6 +357,36 @@ def test_func(dstream):
356
357
self ._sort_result_based_on_key (result )
357
358
self .assertEqual (expected_output , output )
358
359
360
+ def test_combineByKey_batch (self ):
361
+ """Basic operation test for DStream.combineByKey with batch deserializer."""
362
+ test_input = [range (1 , 5 ), [1 , 1 , 1 , 2 , 2 , 3 ], ["a" , "a" , "b" , "" , "" , "" ]]
363
+
364
+ def test_func (dstream ):
365
+ def add (a , b ): return a + str (b )
366
+ return dstream .map (lambda x : (x , 1 )).combineByKey (str , add , add )
367
+ expected_output = [[(1 , "1" ), (2 , "1" ), (3 , "1" ), (4 , "1" )],
368
+ [(1 , "111" ), (2 , "11" ), (3 , "1" )],
369
+ [("a" , "11" ), ("b" , "1" ), ("" , "111" )]]
370
+ output = self ._run_stream (test_input , test_func , expected_output )
371
+ for result in (output , expected_output ):
372
+ self ._sort_result_based_on_key (result )
373
+ self .assertEqual (expected_output , output )
374
+
375
+ def test_combineByKey_unbatch (self ):
376
+ """Basic operation test for DStream.combineByKey with unbatch deserializer."""
377
+ test_input = [range (1 , 4 ), [1 , 1 , "" ], ["a" , "a" , "b" ]]
378
+
379
+ def test_func (dstream ):
380
+ def add (a , b ): return a + str (b )
381
+ return dstream .map (lambda x : (x , 1 )).combineByKey (str , add , add )
382
+ expected_output = [[(1 , "1" ), (2 , "1" ), (3 , "1" )],
383
+ [(1 , "11" ), ("" , "1" )],
384
+ [("a" , "11" ), ("b" , "1" )]]
385
+ output = self ._run_stream (test_input , test_func , expected_output )
386
+ for result in (output , expected_output ):
387
+ self ._sort_result_based_on_key (result )
388
+ self .assertEqual (expected_output , output )
389
+
359
390
def _convert_iter_value_to_list (self , outputs ):
360
391
"""Return key value pair list. Value is converted to iterator to list."""
361
392
result = list ()
0 commit comments