Skip to content

Commit 4aa99e4

Browse files
committed
added TODO coments
1 parent e9fab72 commit 4aa99e4

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

python/pyspark/streaming/context.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import sys
1919
from signal import signal, SIGTERM, SIGINT
20-
from tempfile import NamedTemporaryFile
2120

2221
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
2322
from pyspark.context import SparkContext
@@ -79,6 +78,7 @@ def _clean_up_trigger(self):
7978
"""Kill py4j callback server properly using signal lib"""
8079

8180
def clean_up_handler(*args):
81+
SparkContext._gateway._shutdown_callback_server()
8282
SparkContext._gateway.shutdown()
8383
sys.exit(0)
8484

@@ -128,6 +128,7 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
128128
self._jssc.stop(stopSparkContext, stopGraceFully)
129129
finally:
130130
# Stop Callback server
131+
SparkContext._gateway._shutdown_callback_server()
131132
SparkContext._gateway.shutdown()
132133

133134
def _testInputStream(self, test_inputs, numSlices=None):

python/pyspark/streaming/dstream.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,15 +376,27 @@ def saveAsTextFile(rdd, time):
376376
return self.foreachRDD(saveAsTextFile)
377377

378378

379+
# TODO: implement updateStateByKey
380+
# TODO: implement slice
381+
382+
# Window Operations
383+
# TODO: implement window
384+
# TODO: implement groupByKeyAndWindow
385+
# TODO: implement reduceByKeyAndWindow
386+
# TODO: implement countByValueAndWindow
387+
# TODO: implement countByWindow
388+
# TODO: implement reduceByWindow
389+
379390
# Following operation has dependency to transform
380-
# TODO: impelment union
391+
# TODO: implement transform
392+
# TODO: implement transformWith
393+
# TODO: implement union
381394
# TODO: implement repertitions
382395
# TODO: implement cogroup
383396
# TODO: implement join
384397
# TODO: implement leftOuterJoin
385398
# TODO: implemtnt rightOuterJoin
386399

387-
388400
class PipelinedDStream(DStream):
389401
def __init__(self, prev, func, preservesPartitioning=False):
390402
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

0 commit comments

Comments
 (0)