Skip to content

Commit 8ffdbf1

Browse files
committed
added atexit to handle callback server
1 parent d5f5fcb commit 8ffdbf1

File tree

2 files changed

+83
-13
lines changed

2 files changed

+83
-13
lines changed

python/pyspark/streaming/context.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import sys
1919
from signal import signal, SIGTERM, SIGINT
20+
import atexit
21+
import time
2022

2123
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
2224
from pyspark.context import SparkContext
@@ -73,29 +75,30 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
7375
# Callback sever is need only by SparkStreming; therefore the callback sever
7476
# is started in StreamingContext.
7577
SparkContext._gateway.restart_callback_server()
76-
self._set_clean_up_trigger()
78+
self._set_clean_up_handler()
7779
self._jvm = self._sc._jvm
7880
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)
7981

8082
# Initialize StremaingContext in function to allow subclass specific initialization
8183
def _initialize_context(self, jspark_context, jduration):
8284
return self._jvm.JavaStreamingContext(jspark_context, jduration)
8385

84-
def _set_clean_up_trigger(self):
85-
"""Kill py4j callback server properly using signal lib"""
86+
def _set_clean_up_handler(self):
87+
""" set clean up hander using atexit """
8688

87-
def clean_up_handler(*args):
88-
# Make sure stop callback server.
89+
def clean_up_handler():
8990
SparkContext._gateway.shutdown()
90-
sys.exit(0)
9191

92+
atexit.register(clean_up_handler)
93+
# atext is not called when the program is killed by a signal not handled by
94+
# Python.
9295
for sig in (SIGINT, SIGTERM):
9396
signal(sig, clean_up_handler)
9497

9598
@property
9699
def sparkContext(self):
97100
"""
98-
Return SparkContext which is associated this StreamingContext
101+
Return SparkContext which is associated with this StreamingContext.
99102
"""
100103
return self._sc
101104

@@ -152,11 +155,14 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
152155
Stop the execution of the streams immediately (does not wait for all received data
153156
to be processed).
154157
"""
155-
try:
156-
self._jssc.stop(stopSparkContext, stopGraceFully)
157-
finally:
158-
SparkContext._gateway.shutdown()
158+
self._jssc.stop(stopSparkContext, stopGraceFully)
159+
if stopSparkContext:
160+
self._sc.stop()
159161

162+
# Shutdown only callback server and all py3j client is shutdowned
163+
# clean up handler
164+
SparkContext._gateway._shutdown_callback_server()
165+
160166
def _testInputStream(self, test_inputs, numSlices=None):
161167
"""
162168
This function is only for unittest.

python/pyspark/streaming/tests.py

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import unittest
3434

3535
from pyspark.context import SparkContext
36+
from pyspark.conf import SparkConf
3637
from pyspark.streaming.context import StreamingContext
3738
from pyspark.streaming.duration import *
3839

@@ -47,8 +48,6 @@ def tearDown(self):
4748
# we do not wait to shutdown py4j client.
4849
self.ssc._jssc.stop()
4950
self.ssc._sc.stop()
50-
# Why does it long time to terminate StremaingContext and SparkContext?
51-
# Should we change the sleep time if this depends on machine spec?
5251
time.sleep(1)
5352

5453
@classmethod
@@ -455,6 +454,71 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
455454

456455
return result
457456

457+
458+
class TestStreamingContextSuite(unittest.TestCase):
459+
"""
460+
Should we have conf property in SparkContext?
461+
@property
462+
def conf(self):
463+
return self._conf
464+
465+
"""
466+
def setUp(self):
467+
self.master = "local[2]"
468+
self.appName = self.__class__.__name__
469+
self.batachDuration = Milliseconds(500)
470+
self.sparkHome = "SomeDir"
471+
self.envPair = {"key": "value"}
472+
473+
def tearDown(self):
474+
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
475+
# we do not wait to shutdown py4j client.
476+
self.ssc._jssc.stop()
477+
self.ssc._sc.stop()
478+
# Why does it long time to terminate StremaingContext and SparkContext?
479+
# Should we change the sleep time if this depends on machine spec?
480+
time.sleep(1)
481+
482+
@classmethod
483+
def tearDownClass(cls):
484+
# Make sure tp shutdown the callback server
485+
SparkContext._gateway._shutdown_callback_server()
486+
487+
488+
def test_from_no_conf_constructor(self):
489+
ssc = StreamingContext(master=self.master, appName=self.appName, duration=batachDuration)
490+
# Alternative call master: ssc.sparkContext.master
491+
# I try to make code close to Scala.
492+
self.assertEqual(ssc.sparkContext._conf.get("spark.master"), self.master)
493+
self.assertEqual(ssc.sparkContext._conf.get("spark.app.name"), self.appName)
494+
495+
def test_from_no_conf_plus_spark_home(self):
496+
ssc = StreamingContext(master=self.master, appName=self.appName,
497+
sparkHome=self.sparkHome, duration=batachDuration)
498+
self.assertEqual(ssc.sparkContext._conf.get("spark.home"), self.sparkHome)
499+
500+
def test_from_existing_spark_context(self):
501+
sc = SparkContext(master=self.master, appName=self.appName)
502+
ssc = StreamingContext(sparkContext=sc)
503+
504+
def test_existing_spark_context_with_settings(self):
505+
conf = SparkConf()
506+
conf.set("spark.cleaner.ttl", "10")
507+
sc = SparkContext(master=self.master, appName=self.appName, conf=conf)
508+
ssc = StreamingContext(context=sc)
509+
self.assertEqual(int(ssc.sparkContext._conf.get("spark.cleaner.ttl")), 10)
510+
511+
def _addInputStream(self, s):
512+
test_inputs = map(lambda x: range(1, x), range(5, 101))
513+
# make sure numSlice is 2 due to deserializer proglem in pyspark
514+
s._testInputStream(test_inputs, 2)
515+
516+
517+
518+
519+
520+
521+
458522
if __name__ == "__main__":
459523
unittest.main()
460524
SparkContext._gateway._shutdown_callback_server()

0 commit comments

Comments
 (0)