31
31
def _daemonize_callback_server ():
32
32
"""
33
33
Hack Py4J to daemonize callback server
34
+
35
+ The thread of callback server has daemon=False, it will block the driver
36
+ from exiting if it's not shutdown. The following code replace `start()`
37
+ of CallbackServer with a new version, which set daemon=True for this
38
+ thread.
34
39
"""
35
40
# TODO: create a patch for Py4J
36
41
import socket
@@ -47,7 +52,6 @@ def start(self):
47
52
1 )
48
53
try :
49
54
self .server_socket .bind ((self .address , self .port ))
50
- # self.port = self.server_socket.getsockname()[1]
51
55
except Exception :
52
56
msg = 'An error occurred while trying to start the callback server'
53
57
logger .exception (msg )
@@ -63,19 +67,21 @@ def start(self):
63
67
64
68
class StreamingContext (object ):
65
69
"""
66
- Main entry point for Spark Streaming functionality. A StreamingContext represents the
67
- connection to a Spark cluster, and can be used to create L{DStream}s and
68
- broadcast variables on that cluster.
70
+ Main entry point for Spark Streaming functionality. A StreamingContext
71
+ represents the connection to a Spark cluster, and can be used to create
72
+ L{DStream}s various input sources. It can be from an existing L{SparkContext}.
73
+ After creating and transforming DStreams, the streaming computation can
74
+ be started and stopped using `context.start()` and `context.stop()`,
75
+ respectively. `context.awaitTransformation()` allows the current thread
76
+ to wait for the termination of the context by `stop()` or by an exception.
69
77
"""
70
78
71
79
def __init__ (self , sparkContext , duration ):
72
80
"""
73
- Create a new StreamingContext. At least the master and app name and duration
74
- should be set, either through the named parameters here or through C{conf}.
81
+ Create a new StreamingContext.
75
82
76
83
@param sparkContext: L{SparkContext} object.
77
- @param duration: seconds for SparkStreaming.
78
-
84
+ @param duration: number of seconds.
79
85
"""
80
86
self ._sc = sparkContext
81
87
self ._jvm = self ._sc ._jvm
@@ -127,8 +133,12 @@ def awaitTermination(self, timeout=None):
127
133
128
134
def stop (self , stopSparkContext = True , stopGraceFully = False ):
129
135
"""
130
- Stop the execution of the streams immediately (does not wait for all received data
131
- to be processed).
136
+ Stop the execution of the streams, with option of ensuring all
137
+ received data has been processed.
138
+
139
+ @param stopSparkContext Stop the associated SparkContext or not
140
+ @param stopGracefully Stop gracefully by waiting for the processing
141
+ of all received data to be completed
132
142
"""
133
143
self ._jssc .stop (stopSparkContext , stopGraceFully )
134
144
if stopSparkContext :
@@ -140,7 +150,7 @@ def remember(self, duration):
140
150
in the last given duration. DStreams remember RDDs only for a
141
151
limited duration of time and releases them for garbage collection.
142
152
This method allows the developer to specify how to long to remember
143
- the RDDs ( if the developer wishes to query old data outside the
153
+ the RDDs (if the developer wishes to query old data outside the
144
154
DStream computation).
145
155
146
156
@param duration Minimum duration (in seconds) that each DStream
0 commit comments