@@ -50,8 +50,8 @@ def setUp(self):
50
50
self .ssc = StreamingContext (appName = class_name , duration = Seconds (1 ))
51
51
52
52
def tearDown (self ):
53
- # Do not call StreamingContext.stop directly because we do not wait to shutdown
54
- # call back server and py4j client
53
+ # Do not call pyspark.streaming.context. StreamingContext.stop directly because
54
+ # we do not wait to shutdowncall back server and py4j client
55
55
self .ssc ._jssc .stop ()
56
56
self .ssc ._sc .stop ()
57
57
# Why does it long time to terminaete StremaingContext and SparkContext?
@@ -146,7 +146,7 @@ def _run_stream(self, test_input, test_func, expected_output):
146
146
"""Start stream and return the output"""
147
147
# Generate input stream with user-defined input
148
148
test_input_stream = self .ssc ._testInputStream (test_input )
149
- # Applied test function to stream
149
+ # Apply test function to stream
150
150
test_stream = test_func (test_input_stream )
151
151
# Add job to get output from stream
152
152
test_stream ._test_output (StreamOutput .result )
@@ -160,6 +160,7 @@ def _run_stream(self, test_input, test_func, expected_output):
160
160
if (current_time - start_time ) > self .timeout :
161
161
break
162
162
self .ssc .awaitTermination (50 )
163
+ # check if the output is the same length of expexted output
163
164
if len (expected_output ) == len (StreamOutput .result ):
164
165
break
165
166
return StreamOutput .result
0 commit comments