Skip to content

Commit d872b5e

Browse files
rsotn-maprekrivokonmapr
authored andcommitted
Spark streaming skipped message with zero offset from Kafka 0.9
1 parent afc25d8 commit d872b5e

File tree

2 files changed

+7
-14
lines changed

2 files changed

+7
-14
lines changed

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/CachedKafkaConsumer.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,14 @@
1616
*/
1717

1818
package org.apache.spark.streaming.kafka09
19-
2019
import java.{ util => ju }
2120

2221
import collection.JavaConverters._
23-
2422
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
2523
import org.apache.kafka.common.{KafkaException, TopicPartition}
2624

2725
import org.apache.spark.internal.Logging
2826

29-
3027
/**
3128
* Consumer of single topicpartition, intended for cached reuse.
3229
* Underlying consumer is not threadsafe, so neither is this,
@@ -104,14 +101,8 @@ class CachedKafkaConsumer[K, V] private(
104101
val p = consumer.poll(timeout)
105102
val r = p.records(topicPartition)
106103

107-
val preparedRecords = if (r.size > 1) {
108-
r.asScala.filter(_.offset() > 0).asJava
109-
} else {
110-
r
111-
}
112-
113-
logDebug(s"Polled ${p.partitions()} ${preparedRecords.size}")
114-
buffer = preparedRecords.iterator()
104+
logDebug(s"Polled ${p.partitions()} ${r.size}")
105+
buffer = r.iterator()
115106
}
116107
}
117108

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaRDD.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,16 +227,18 @@ private[spark] class KafkaRDD[K, V](
227227
// override def hasNext(): Boolean = requestOffset < part.untilOffset
228228
override def getNext(): ConsumerRecord[K, V] = {
229229

230+
val isStreams = consumer.topic.startsWith("/") && consumer.topic.contains(":")
231+
230232
@tailrec
231233
def skipGapsAndGetNext: ConsumerRecord[K, V] = {
232234
if (requestOffset < part.untilOffset) {
233235
val r = consumer.get(requestOffset, pollTimeout)
234236

235-
requestOffset = if (r.offset() == 0) {part.untilOffset} else {r.offset() + 1}
236-
237-
if (null == r && r.offset() == 0) {
237+
if (isStreams && r.offset() == 0) {
238+
requestOffset = requestOffset + 1
238239
skipGapsAndGetNext
239240
} else {
241+
requestOffset = r.offset() + 1
240242
r
241243
}
242244
} else {

0 commit comments

Comments
 (0)