|
15 | 15 | # limitations under the License.
|
16 | 16 | #
|
17 | 17 |
|
18 |
| -import time |
| 18 | +import sys |
| 19 | +from signal import signal, SIGTERM, SIGINT |
19 | 20 |
|
20 | 21 | from pyspark.conf import SparkConf
|
21 | 22 | from pyspark.files import SparkFiles
|
@@ -63,22 +64,31 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
|
63 | 64 |
|
64 | 65 | """
|
65 | 66 |
|
66 |
| - # launch call back server |
67 |
| - if not gateway: |
68 |
| - gateway = launch_gateway() |
69 |
| -# gateway.restart_callback_server() |
70 |
| - |
71 | 67 | # Create the Python Sparkcontext
|
72 | 68 | self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
|
73 | 69 | pyFiles=pyFiles, environment=environment, batchSize=batchSize,
|
74 | 70 | serializer=serializer, conf=conf, gateway=gateway)
|
| 71 | + |
| 72 | + # Start py4j callback server |
| 73 | + SparkContext._gateway.restart_callback_server() |
| 74 | + self._clean_up_trigger() |
75 | 75 | self._jvm = self._sc._jvm
|
76 | 76 | self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)
|
77 | 77 |
|
78 | 78 | # Initialize StremaingContext in function to allow subclass specific initialization
|
79 | 79 | def _initialize_context(self, jspark_context, jduration):
|
80 | 80 | return self._jvm.JavaStreamingContext(jspark_context, jduration)
|
81 | 81 |
|
| 82 | + def _clean_up_trigger(self): |
| 83 | + """Kill py4j callback server properly using signal lib""" |
| 84 | + |
| 85 | + def clean_up_handler(*args): |
| 86 | + SparkContext._gateway.shutdown() |
| 87 | + sys.exit(0) |
| 88 | + |
| 89 | + for sig in (SIGINT, SIGTERM): |
| 90 | + signal(sig, clean_up_handler) |
| 91 | + |
82 | 92 | def start(self):
|
83 | 93 | """
|
84 | 94 | Start the execution of the streams.
|
|
0 commit comments