Skip to content

Commit e059ca2

Browse files
committed
move check of window into Python
1 parent fce0ef5 commit e059ca2

File tree

3 files changed

+17
-11
lines changed

3 files changed

+17
-11
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,15 @@ def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None)
354354

355355
def reduceByKeyAndWindow(self, func, invFunc,
356356
windowDuration, slideDuration, numPartitions=None):
357+
358+
duration = self._jdstream.dstream().slideDuration().milliseconds()
359+
if int(windowDuration * 1000) % duration != 0:
360+
raise ValueError("windowDuration must be multiple of the slide duration (%d ms)"
361+
% duration)
362+
if int(slideDuration * 1000) % duration != 0:
363+
raise ValueError("slideDuration must be multiple of the slide duration (%d ms)"
364+
% duration)
365+
357366
reduced = self.reduceByKey(func)
358367

359368
def reduceFunc(a, b, t):

python/pyspark/streaming/tests.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,12 @@ def func(dstream):
292292
[('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]]
293293
self._test_func(input, func, expected)
294294

295+
def test_reduce_by_invalid_window(self):
296+
input1 = [range(3), range(5), range(1), range(6)]
297+
d1 = self.ssc.queueStream(input1)
298+
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1))
299+
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1))
300+
295301
def update_state_by_key(self):
296302

297303
def updater(it):

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ private[spark] object PythonDStream {
9292
* If the result RDD is PythonRDD, then it will cache it as an template for future use,
9393
* this can reduce the Python callbacks.
9494
*/
95-
private[spark] class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction,
95+
private[spark]
96+
class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction,
9697
var reuse: Boolean = false)
9798
extends PythonDStream(parent) {
9899

@@ -180,16 +181,6 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
180181
_slideDuration: Duration
181182
) extends PythonStateDStream(parent, preduceFunc) {
182183

183-
assert(_windowDuration.isMultipleOf(parent.slideDuration),
184-
"The window duration of ReducedWindowedDStream (" + _windowDuration + ") " +
185-
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
186-
)
187-
188-
assert(_slideDuration.isMultipleOf(parent.slideDuration),
189-
"The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " +
190-
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
191-
)
192-
193184
val invReduceFunc = new RDDFunction(pinvReduceFunc)
194185

195186
def windowDuration: Duration = _windowDuration

0 commit comments

Comments
 (0)