Skip to content

Commit c499ba0

Browse files
committed
remove Time and Duration
1 parent 3f0fb4b commit c499ba0

File tree

5 files changed

+14
-559
lines changed

5 files changed

+14
-559
lines changed

python/pyspark/streaming/context.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
#
1717

1818
from pyspark import RDD
19-
from pyspark.serializers import UTF8Deserializer, BatchedSerializer
19+
from pyspark.serializers import UTF8Deserializer
2020
from pyspark.context import SparkContext
2121
from pyspark.storagelevel import StorageLevel
2222
from pyspark.streaming.dstream import DStream
23-
from pyspark.streaming.duration import Seconds
2423

2524
from py4j.java_collections import ListConverter
2625

@@ -76,9 +75,6 @@ def __init__(self, sparkContext, duration):
7675
@param duration: A L{Duration} object or seconds for SparkStreaming.
7776
7877
"""
79-
if isinstance(duration, (int, long, float)):
80-
duration = Seconds(duration)
81-
8278
self._sc = sparkContext
8379
self._jvm = self._sc._jvm
8480
self._start_callback_server()
@@ -93,7 +89,10 @@ def _start_callback_server(self):
9389
gw._python_proxy_port = gw._callback_server.port # update port with real port
9490

9591
def _initialize_context(self, sc, duration):
96-
return self._jvm.JavaStreamingContext(sc._jsc, duration._jduration)
92+
return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration))
93+
94+
def _jduration(self, seconds):
95+
return self._jvm.Duration(int(seconds * 1000))
9796

9897
@property
9998
def sparkContext(self):
@@ -111,12 +110,12 @@ def start(self):
111110
def awaitTermination(self, timeout=None):
112111
"""
113112
Wait for the execution to stop.
114-
@param timeout: time to wait in milliseconds
113+
@param timeout: time to wait in seconds
115114
"""
116115
if timeout is None:
117116
self._jssc.awaitTermination()
118117
else:
119-
self._jssc.awaitTermination(timeout)
118+
self._jssc.awaitTermination(int(timeout * 1000))
120119

121120
def stop(self, stopSparkContext=True, stopGraceFully=False):
122121
"""
@@ -139,10 +138,7 @@ def remember(self, duration):
139138
@param duration Minimum duration (in seconds) that each DStream
140139
should remember its RDDs
141140
"""
142-
if isinstance(duration, (int, long, float)):
143-
duration = Seconds(duration)
144-
145-
self._jssc.remember(duration._jduration)
141+
self._jssc.remember(self._jduration(duration))
146142

147143
def checkpoint(self, directory):
148144
"""

python/pyspark/streaming/dstream.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from pyspark.storagelevel import StorageLevel
2323
from pyspark.streaming.util import rddToFileName, RDDFunction, RDDFunction2
2424
from pyspark.rdd import portable_hash
25-
from pyspark.streaming.duration import Duration, Seconds
2625
from pyspark.resultiterable import ResultIterable
2726

2827
__all__ = ["DStream"]
@@ -334,10 +333,10 @@ def slice(self, begin, end):
334333
return [RDD(jrdd, self.ctx, self._jrdd_deserializer) for jrdd in jrdds]
335334

336335
def window(self, windowDuration, slideDuration=None):
337-
d = Seconds(windowDuration)
336+
d = self._ssc._jduration(windowDuration)
338337
if slideDuration is None:
339338
return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer)
340-
s = Seconds(slideDuration)
339+
s = self._ssc._jduration(slideDuration)
341340
return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)
342341

343342
def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
@@ -375,16 +374,12 @@ def invReduceFunc(a, b, t):
375374
joined = a.leftOuterJoin(b, numPartitions)
376375
return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1)
377376

378-
if not isinstance(windowDuration, Duration):
379-
windowDuration = Seconds(windowDuration)
380-
if not isinstance(slideDuration, Duration):
381-
slideDuration = Seconds(slideDuration)
382377
jreduceFunc = RDDFunction2(self.ctx, reduceFunc, reduced._jrdd_deserializer)
383378
jinvReduceFunc = RDDFunction2(self.ctx, invReduceFunc, reduced._jrdd_deserializer)
384379
dstream = self.ctx._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(),
385380
jreduceFunc, jinvReduceFunc,
386-
windowDuration._jduration,
387-
slideDuration._jduration)
381+
self._ssc._jduration(windowDuration),
382+
self._ssc._jduration(slideDuration))
388383
return DStream(dstream.asJavaDStream(), self._ssc, self.ctx.serializer)
389384

390385
def updateStateByKey(self, updateFunc, numPartitions=None):

0 commit comments

Comments
 (0)