Skip to content

Commit 268a6a5

Browse files
committed
Changed awaitTermination not to call awaitTermincation in Scala. Just use time.sleep instread
1 parent 09a28bf commit 268a6a5

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

python/pyspark/streaming/context.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
import sys
19+
import time
1920
from signal import signal, SIGTERM, SIGINT
2021

2122
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
@@ -102,11 +103,12 @@ def start(self):
102103
def awaitTermination(self, timeout=None):
103104
"""
104105
Wait for the execution to stop.
106+
timeout is milliseconds
105107
"""
106108
if timeout is None:
107109
self._jssc.awaitTermination()
108110
else:
109-
self._jssc.awaitTermination(timeout)
111+
time.sleep(timeout/1000)
110112

111113
#TODO: add storageLevel
112114
def socketTextStream(self, hostname, port):

python/pyspark/streaming_tests.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def tearDown(self):
4848
self.ssc._sc.stop()
4949
# Why does it long time to terminate StremaingContext and SparkContext?
5050
# Should we change the sleep time if this depends on machine spec?
51-
time.sleep(10)
51+
time.sleep(1)
5252

5353
@classmethod
5454
def tearDownClass(cls):
@@ -436,7 +436,8 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
436436
# Check time out.
437437
if (current_time - start_time) > self.timeout:
438438
break
439-
self.ssc.awaitTermination(50)
439+
#self.ssc.awaitTermination(50)
440+
time.sleep(0.05)
440441
# Check if the output is the same length of expexted output.
441442
if len(expected_output) == len(result):
442443
break

0 commit comments

Comments
 (0)