Skip to content

Commit f198d14

Browse files
committed
clean up code
1 parent 3166d31 commit f198d14

File tree

2 files changed

+19
-20
lines changed

2 files changed

+19
-20
lines changed

python/pyspark/streaming/context.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,21 +72,19 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
7272
# Callback sever is need only by SparkStreming; therefore the callback sever
7373
# is started in StreamingContext.
7474
SparkContext._gateway.restart_callback_server()
75-
self._clean_up_trigger()
75+
self._set_clean_up_trigger()
7676
self._jvm = self._sc._jvm
7777
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)
7878

7979
# Initialize StremaingContext in function to allow subclass specific initialization
8080
def _initialize_context(self, jspark_context, jduration):
8181
return self._jvm.JavaStreamingContext(jspark_context, jduration)
8282

83-
def _clean_up_trigger(self):
83+
def _set_clean_up_trigger(self):
8484
"""Kill py4j callback server properly using signal lib"""
8585

8686
def clean_up_handler(*args):
8787
# Make sure stop callback server.
88-
# This need improvement how to terminate callback sever properly.
89-
SparkContext._gateway._shutdown_callback_server()
9088
SparkContext._gateway.shutdown()
9189
sys.exit(0)
9290

@@ -132,18 +130,15 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
132130
Stop the execution of the streams immediately (does not wait for all received data
133131
to be processed).
134132
"""
135-
136133
try:
137134
self._jssc.stop(stopSparkContext, stopGraceFully)
138135
finally:
139-
# Stop Callback server
140-
SparkContext._gateway._shutdown_callback_server()
141136
SparkContext._gateway.shutdown()
142137

143138
def _testInputStream(self, test_inputs, numSlices=None):
144139
"""
145140
This function is only for unittest.
146-
It requires a sequence as input, and returns the i_th element at the i_th batch
141+
It requires a list as input, and returns the i_th element at the i_th batch
147142
under manual clock.
148143
"""
149144
test_rdds = list()

python/pyspark/streaming/dstream.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def _defaultReducePartitions(self):
207207
"""
208208
Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
209209
If spark.default.parallelism is set, then we'll use the value from SparkContext
210-
defaultParallelism, otherwise we'll use the number of partitions in this RDD.
210+
defaultParallelism, otherwise we'll use the number of partitions in this RDD
211211
212212
This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
213213
the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
@@ -222,7 +222,8 @@ def getNumPartitions(self):
222222
"""
223223
Return the number of partitions in RDD
224224
"""
225-
# TODO: remove hardcoding. RDD has NumPartitions but DStream does not have.
225+
# TODO: remove hardcoding. RDD has NumPartitions. How do we get the number of partition
226+
# through DStream?
226227
return 2
227228

228229
def foreachRDD(self, func):
@@ -243,6 +244,10 @@ def pyprint(self):
243244
operator, so this DStream will be registered as an output stream and there materialized.
244245
"""
245246
def takeAndPrint(rdd, time):
247+
"""
248+
Closure to take element from RDD and print first 10 elements.
249+
This closure is called by py4j callback server.
250+
"""
246251
taken = rdd.take(11)
247252
print "-------------------------------------------"
248253
print "Time: %s" % (str(time))
@@ -307,17 +312,11 @@ def checkpoint(self, interval):
307312
Mark this DStream for checkpointing. It will be saved to a file inside the
308313
checkpoint directory set with L{SparkContext.setCheckpointDir()}
309314
310-
I am not sure this part in DStream
311-
and
312-
all references to its parent RDDs will be removed. This function must
313-
be called before any job has been executed on this RDD. It is strongly
314-
recommended that this RDD is persisted in memory, otherwise saving it
315-
on a file will require recomputation.
316-
317-
interval must be pysprak.streaming.duration
315+
@param interval: Time interval after which generated RDD will be checkpointed
316+
interval has to be pyspark.streaming.duration.Duration
318317
"""
319318
self.is_checkpointed = True
320-
self._jdstream.checkpoint(interval)
319+
self._jdstream.checkpoint(interval._jduration)
321320
return self
322321

323322
def groupByKey(self, numPartitions=None):
@@ -369,6 +368,10 @@ def saveAsTextFiles(self, prefix, suffix=None):
369368
Save this DStream as a text file, using string representations of elements.
370369
"""
371370
def saveAsTextFile(rdd, time):
371+
"""
372+
Closure to save element in RDD in DStream as Pickled data in file.
373+
This closure is called by py4j callback server.
374+
"""
372375
path = rddToFileName(prefix, suffix, time)
373376
rdd.saveAsTextFile(path)
374377

@@ -410,9 +413,10 @@ def get_output(rdd, time):
410413
# TODO: implement countByWindow
411414
# TODO: implement reduceByWindow
412415

413-
# Following operation has dependency to transform
416+
# transform Operation
414417
# TODO: implement transform
415418
# TODO: implement transformWith
419+
# Following operation has dependency with transform
416420
# TODO: implement union
417421
# TODO: implement repertitions
418422
# TODO: implement cogroup

0 commit comments

Comments
 (0)