Skip to content

Commit f7bc8f9

Browse files
committed
WIP:added more test for StreamingContext
1 parent ee50c5a commit f7bc8f9

File tree

2 files changed

+14
-2
lines changed

2 files changed

+14
-2
lines changed

python/pyspark/streaming/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
6363
6464
"""
6565

66+
if not isinstance(duration, Duration):
67+
raise TypeError("Input should be pyspark.streaming.duration.Duration object")
68+
6669
if sparkContext is None:
6770
# Create the Python Sparkcontext
6871
self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,

python/pyspark/streaming/tests.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ def setUp(self):
473473
def tearDown(self):
474474
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
475475
# we do not wait to shutdown py4j client.
476+
# We need change this simply calll streamingConxt.Stop
476477
self.ssc._jssc.stop()
477478
self.ssc._sc.stop()
478479
# Why does it long time to terminate StremaingContext and SparkContext?
@@ -484,7 +485,6 @@ def tearDownClass(cls):
484485
# Make sure tp shutdown the callback server
485486
SparkContext._gateway._shutdown_callback_server()
486487

487-
488488
def test_from_no_conf_constructor(self):
489489
ssc = StreamingContext(master=self.master, appName=self.appName, duration=batachDuration)
490490
# Alternative call master: ssc.sparkContext.master
@@ -513,12 +513,21 @@ def _addInputStream(self, s):
513513
# make sure numSlice is 2 due to deserializer proglem in pyspark
514514
s._testInputStream(test_inputs, 2)
515515

516+
def test_from_no_conf_plus_spark_home_plus_env(self):
517+
pass
518+
519+
def test_from_conf_with_settings(self):
520+
pass
521+
522+
def test_stop_only_streaming_context(self):
523+
pass
516524

525+
def test_await_termination(self):
526+
pass
517527

518528

519529

520530

521531

522532
if __name__ == "__main__":
523533
unittest.main()
524-
SparkContext._gateway._shutdown_callback_server()

0 commit comments

Comments
 (0)