Skip to content

Commit 16aa64f

Browse files
committed
added TODO coments
1 parent 74535d4 commit 16aa64f

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

python/pyspark/streaming/context.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import sys
1919
from signal import signal, SIGTERM, SIGINT
20-
from tempfile import NamedTemporaryFile
2120

2221
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
2322
from pyspark.context import SparkContext
@@ -79,6 +78,7 @@ def _clean_up_trigger(self):
7978
"""Kill py4j callback server properly using signal lib"""
8079

8180
def clean_up_handler(*args):
81+
SparkContext._gateway._shutdown_callback_server()
8282
SparkContext._gateway.shutdown()
8383
sys.exit(0)
8484

@@ -128,6 +128,7 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
128128
self._jssc.stop(stopSparkContext, stopGraceFully)
129129
finally:
130130
# Stop Callback server
131+
SparkContext._gateway._shutdown_callback_server()
131132
SparkContext._gateway.shutdown()
132133

133134
def _testInputStream(self, test_inputs, numSlices=None):

python/pyspark/streaming/dstream.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,15 +389,27 @@ def saveAsTextFile(rdd, time):
389389
return self.foreachRDD(saveAsTextFile)
390390

391391

392+
# TODO: implement updateStateByKey
393+
# TODO: implement slice
394+
395+
# Window Operations
396+
# TODO: implement window
397+
# TODO: implement groupByKeyAndWindow
398+
# TODO: implement reduceByKeyAndWindow
399+
# TODO: implement countByValueAndWindow
400+
# TODO: implement countByWindow
401+
# TODO: implement reduceByWindow
402+
392403
# Following operation has dependency to transform
393-
# TODO: impelment union
404+
# TODO: implement transform
405+
# TODO: implement transformWith
406+
# TODO: implement union
394407
# TODO: implement repertitions
395408
# TODO: implement cogroup
396409
# TODO: implement join
397410
# TODO: implement leftOuterJoin
398411
# TODO: implemtnt rightOuterJoin
399412

400-
401413
class PipelinedDStream(DStream):
402414
def __init__(self, prev, func, preservesPartitioning=False):
403415
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

0 commit comments

Comments
 (0)