Skip to content

Commit 4039b16

Browse files
committed
Fix getOffsetRanges in transform() bug
1 parent afe9f03 commit 4039b16

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,10 @@ def __init__(self, prev, func):
610610
self.is_checkpointed = False
611611
self._jdstream_val = None
612612

613-
if (isinstance(prev, TransformedDStream) and
613+
# Using type() to avoid folding the functions and compacting the DStreams which is not
614+
# not strictly a object of TransformedDStream.
615+
# Changed here is to avoid bug in KafkaTransformedDStream when calling offsetRanges().
616+
if (type(prev) is TransformedDStream and
614617
not prev.is_cached and not prev.is_checkpointed):
615618
prev_func = prev.func
616619
self.func = lambda t, rdd: func(t, prev_func(t, rdd))

python/pyspark/streaming/tests.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -850,7 +850,9 @@ def transformWithOffsetRanges(rdd):
850850
offsetRanges.append(o)
851851
return rdd
852852

853-
stream.transform(transformWithOffsetRanges).foreachRDD(lambda rdd: rdd.count())
853+
# Test whether it is ok mixing KafkaTransformedDStream and TransformedDStream together,
854+
# only the TransformedDstreams can be folded together.
855+
stream.transform(transformWithOffsetRanges).map(lambda kv: kv[1]).count().pprint()
854856
self.ssc.start()
855857
self.wait_for(offsetRanges, 1)
856858

0 commit comments

Comments
 (0)