@@ -84,17 +84,18 @@ class StreamingContext(object):
84
84
"""
85
85
_transformerSerializer = None
86
86
87
- def __init__ (self , sparkContext , duration = None , jssc = None ):
87
+ def __init__ (self , sparkContext , batchDuration = None , jssc = None ):
88
88
"""
89
89
Create a new StreamingContext.
90
90
91
91
@param sparkContext: L{SparkContext} object.
92
- @param duration: number of seconds.
92
+ @param batchDuration: the time interval (in seconds) at which streaming
93
+ data will be divided into batches
93
94
"""
94
95
95
96
self ._sc = sparkContext
96
97
self ._jvm = self ._sc ._jvm
97
- self ._jssc = jssc or self ._initialize_context (self ._sc , duration )
98
+ self ._jssc = jssc or self ._initialize_context (self ._sc , batchDuration )
98
99
99
100
def _initialize_context (self , sc , duration ):
100
101
self ._ensure_initialized ()
@@ -134,26 +135,27 @@ def _ensure_initialized(cls):
134
135
SparkContext ._active_spark_context , CloudPickleSerializer (), gw )
135
136
136
137
@classmethod
137
- def getOrCreate (cls , path , setupFunc ):
138
+ def getOrCreate (cls , checkpointPath , setupFunc ):
138
139
"""
139
- Get the StreamingContext from checkpoint file at `path`, or setup
140
- it by `setupFunc`.
140
+ Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
141
+ If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
142
+ recreated from the checkpoint data. If the data does not exist, then the provided setupFunc
143
+ will be used to create a JavaStreamingContext.
141
144
142
- :param path: directory of checkpoint
143
- :param setupFunc: a function used to create StreamingContext and
144
- setup DStreams.
145
- :return: a StreamingContext
145
+ @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
146
+ @param setupFunc Function to create a new JavaStreamingContext and setup DStreams
146
147
"""
147
- if not os .path .exists (path ) or not os .path .isdir (path ) or not os .listdir (path ):
148
+ # TODO: support checkpoint in HDFS
149
+ if not os .path .exists (checkpointPath ) or not os .listdir (checkpointPath ):
148
150
ssc = setupFunc ()
149
- ssc .checkpoint (path )
151
+ ssc .checkpoint (checkpointPath )
150
152
return ssc
151
153
152
154
cls ._ensure_initialized ()
153
155
gw = SparkContext ._gateway
154
156
155
157
try :
156
- jssc = gw .jvm .JavaStreamingContext (path )
158
+ jssc = gw .jvm .JavaStreamingContext (checkpointPath )
157
159
except Exception :
158
160
print >> sys .stderr , "failed to load StreamingContext from checkpoint"
159
161
raise
@@ -249,12 +251,12 @@ def textFileStream(self, directory):
249
251
"""
250
252
return DStream (self ._jssc .textFileStream (directory ), self , UTF8Deserializer ())
251
253
252
- def _check_serialzers (self , rdds ):
254
+ def _check_serializers (self , rdds ):
253
255
# make sure they have same serializer
254
256
if len (set (rdd ._jrdd_deserializer for rdd in rdds )) > 1 :
255
257
for i in range (len (rdds )):
256
258
# reset them to sc.serializer
257
- rdds [i ] = rdds [i ].map ( lambda x : x , preservesPartitioning = True )
259
+ rdds [i ] = rdds [i ]._reserialize ( )
258
260
259
261
def queueStream (self , rdds , oneAtATime = True , default = None ):
260
262
"""
@@ -275,7 +277,7 @@ def queueStream(self, rdds, oneAtATime=True, default=None):
275
277
276
278
if rdds and not isinstance (rdds [0 ], RDD ):
277
279
rdds = [self ._sc .parallelize (input ) for input in rdds ]
278
- self ._check_serialzers (rdds )
280
+ self ._check_serializers (rdds )
279
281
280
282
jrdds = ListConverter ().convert ([r ._jrdd for r in rdds ],
281
283
SparkContext ._gateway ._gateway_client )
@@ -313,6 +315,10 @@ def union(self, *dstreams):
313
315
raise ValueError ("should have at least one DStream to union" )
314
316
if len (dstreams ) == 1 :
315
317
return dstreams [0 ]
318
+ if len (set (s ._jrdd_deserializer for s in dstreams )) > 1 :
319
+ raise ValueError ("All DStreams should have same serializer" )
320
+ if len (set (s ._slideDuration for s in dstreams )) > 1 :
321
+ raise ValueError ("All DStreams should have same slide duration" )
316
322
first = dstreams [0 ]
317
323
jrest = ListConverter ().convert ([d ._jdstream for d in dstreams [1 :]],
318
324
SparkContext ._gateway ._gateway_client )
0 commit comments