Skip to content

Commit 4dedd2d

Browse files
committed
change test case not to use awaitTermination
1 parent 268a6a5 commit 4dedd2d

File tree

3 files changed

+5
-8
lines changed

3 files changed

+5
-8
lines changed

python/pyspark/streaming/context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ def start(self):
103103
def awaitTermination(self, timeout=None):
104104
"""
105105
Wait for the execution to stop.
106-
timeout is milliseconds
106+
@param timeout: time to wait in milliseconds
107107
"""
108108
if timeout is None:
109109
self._jssc.awaitTermination()
110110
else:
111-
time.sleep(timeout/1000)
111+
self._jssc.awaitTermination(timeout)
112112

113113
#TODO: add storageLevel
114114
def socketTextStream(self, hostname, port):

python/pyspark/streaming_tests.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ class PySparkStreamingTestCase(unittest.TestCase):
3939
def setUp(self):
4040
class_name = self.__class__.__name__
4141
self.ssc = StreamingContext(appName=class_name, duration=Seconds(1))
42-
time.sleep(1)
4342

4443
def tearDown(self):
4544
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
@@ -52,7 +51,7 @@ def tearDown(self):
5251

5352
@classmethod
5453
def tearDownClass(cls):
55-
time.sleep(5)
54+
# Make sure tp shutdown the callback server
5655
SparkContext._gateway._shutdown_callback_server()
5756

5857

@@ -436,7 +435,8 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
436435
# Check time out.
437436
if (current_time - start_time) > self.timeout:
438437
break
439-
#self.ssc.awaitTermination(50)
438+
# StreamingContext.awaitTermination is not used to wait because
439+
# if py4j server is called every 50 milliseconds, it gets an error
440440
time.sleep(0.05)
441441
# Check if the output is the same length of expexted output.
442442
if len(expected_output) == len(result):

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ class PythonForeachDStream(
112112
this.register()
113113
}
114114

115-
/*
116-
This does not work. Ignore this for now. -TD
117115
class PythonTransformedDStream(
118116
prev: DStream[Array[Byte]],
119117
transformFunction: PythonRDDFunction
@@ -131,7 +129,6 @@ class PythonTransformedDStream(
131129

132130
val asJavaDStream = JavaDStream.fromDStream(this)
133131
}
134-
*/
135132

136133
/**
137134
* This is a input stream just for the unitest. This is equivalent to a checkpointable,

0 commit comments

Comments
 (0)