@@ -43,7 +43,7 @@ def setUp(self):
43
43
44
44
def tearDown (self ):
45
45
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
46
- # we do not wait to shutdown call back server and py4j client
46
+ # we do not wait to shutdown py4j client.
47
47
self .ssc ._jssc .stop ()
48
48
self .ssc ._sc .stop ()
49
49
# Why does it long time to terminate StremaingContext and SparkContext?
@@ -74,7 +74,6 @@ def setUp(self):
74
74
PySparkStreamingTestCase .setUp (self )
75
75
self .timeout = 10 # seconds
76
76
self .numInputPartitions = 2
77
- self .result = list ()
78
77
79
78
def tearDown (self ):
80
79
PySparkStreamingTestCase .tearDown (self )
@@ -426,7 +425,8 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
426
425
# Apply test function to stream.
427
426
test_stream = test_func (test_input_stream )
428
427
# Add job to get output from stream.
429
- test_stream ._test_output (self .result )
428
+ result = list ()
429
+ test_stream ._test_output (result )
430
430
self .ssc .start ()
431
431
432
432
start_time = time .time ()
@@ -438,10 +438,10 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
438
438
break
439
439
self .ssc .awaitTermination (50 )
440
440
# Check if the output is the same length of expexted output.
441
- if len (expected_output ) == len (self . result ):
441
+ if len (expected_output ) == len (result ):
442
442
break
443
443
444
- return self . result
444
+ return result
445
445
446
446
class TestSaveAsFilesSuite (PySparkStreamingTestCase ):
447
447
def setUp (self ):
0 commit comments