Skip to content

Commit 095e84a

Browse files
rsotn-maprekrivokonmapr
authored andcommitted
[MAPR-26060] Fixed case when mapr-streams make gaps in offsets (apache#97)
1 parent c7b3581 commit 095e84a

File tree

3 files changed

+57
-17
lines changed

3 files changed

+57
-17
lines changed

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/CachedKafkaConsumer.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ package org.apache.spark.streaming.kafka09
1919

2020
import java.{ util => ju }
2121

22-
import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer }
23-
import org.apache.kafka.common.{ KafkaException, TopicPartition }
22+
import collection.JavaConverters._
23+
24+
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
25+
import org.apache.kafka.common.{KafkaException, TopicPartition}
2426

25-
import org.apache.spark.SparkConf
2627
import org.apache.spark.internal.Logging
2728

2829

@@ -73,9 +74,12 @@ class CachedKafkaConsumer[K, V] private(
7374
if (!buffer.hasNext()) { poll(timeout) }
7475
assert(buffer.hasNext(),
7576
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
76-
var record = buffer.next()
77+
val record = buffer.next()
7778

78-
if (record.offset != offset) {
79+
nextOffset = offset + 1
80+
record
81+
// Offsets in MapR-streams can contains gaps
82+
/* if (record.offset < offset) {
7983
logInfo(s"Buffer miss for $groupId $topic $partition $offset")
8084
seek(offset)
8185
poll(timeout)
@@ -88,6 +92,7 @@ class CachedKafkaConsumer[K, V] private(
8892
8993
nextOffset = offset + 1
9094
record
95+
*/
9196
}
9297

9398
private def seek(offset: Long): Unit = {
@@ -98,8 +103,15 @@ class CachedKafkaConsumer[K, V] private(
98103
private def poll(timeout: Long): Unit = {
99104
val p = consumer.poll(timeout)
100105
val r = p.records(topicPartition)
101-
logDebug(s"Polled ${p.partitions()} ${r.size}")
102-
buffer = r.iterator
106+
107+
val preparedRecords = if (r.size > 1) {
108+
r.asScala.filter(_.offset() > 0).asJava
109+
} else {
110+
r
111+
}
112+
113+
logDebug(s"Polled ${p.partitions()} ${preparedRecords.size}")
114+
buffer = preparedRecords.iterator()
103115
}
104116
}
105117

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaRDD.scala

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import org.apache.spark.partial.{BoundedDouble, PartialResult}
3030
import org.apache.spark.rdd.RDD
3131
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
3232
import org.apache.spark.storage.StorageLevel
33+
import org.apache.spark.util.NextIterator
34+
35+
import scala.annotation.tailrec
3336

3437
/**
3538
* A batch-oriented interface for consuming from Kafka.
@@ -83,11 +86,11 @@ private[spark] class KafkaRDD[K, V](
8386

8487
override def getPartitions: Array[Partition] = {
8588
offsetRanges.zipWithIndex.map { case (o, i) =>
86-
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
89+
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
8790
}.toArray
8891
}
8992

90-
override def count(): Long = offsetRanges.map(_.count).sum
93+
// override def count(): Long = offsetRanges.map(_.count).sum
9194

9295
override def countApprox(
9396
timeout: Long,
@@ -193,7 +196,7 @@ private[spark] class KafkaRDD[K, V](
193196
*/
194197
private class KafkaRDDIterator(
195198
part: KafkaRDDPartition,
196-
context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
199+
context: TaskContext) extends NextIterator[ConsumerRecord[K, V]] {
197200

198201
logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
199202
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
@@ -215,19 +218,41 @@ private[spark] class KafkaRDD[K, V](
215218

216219
var requestOffset = part.fromOffset
217220

218-
def closeIfNeeded(): Unit = {
221+
override def close(): Unit = {
219222
if (!useConsumerCache && consumer != null) {
220223
consumer.close
221224
}
222225
}
223226

224-
override def hasNext(): Boolean = requestOffset < part.untilOffset
227+
// override def hasNext(): Boolean = requestOffset < part.untilOffset
228+
override def getNext(): ConsumerRecord[K, V] = {
229+
230+
@tailrec
231+
def skipGapsAndGetNext: ConsumerRecord[K, V] = {
232+
if (requestOffset < part.untilOffset) {
233+
val r = consumer.get(requestOffset, pollTimeout)
234+
235+
requestOffset = if (r.offset() == 0) {part.untilOffset} else {r.offset() + 1}
236+
237+
if (null == r && r.offset() == 0) {
238+
skipGapsAndGetNext
239+
} else {
240+
r
241+
}
242+
} else {
243+
finished = true
244+
null.asInstanceOf[ConsumerRecord[K, V]]
245+
}
246+
}
225247

226-
override def next(): ConsumerRecord[K, V] = {
227-
assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
228-
val r = consumer.get(requestOffset, pollTimeout)
229-
requestOffset += 1
230-
r
248+
skipGapsAndGetNext
231249
}
250+
251+
// override def next(): ConsumerRecord[K, V] = {
252+
// assert(hasNext, "Can't call getNext() once untilOffset has been reached")
253+
// val r = consumer.get(requestOffset, pollTimeout)
254+
// requestOffset += 1
255+
// r
256+
// }
232257
}
233258
}

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,9 @@ object KafkaUtils extends Logging {
206206
* Tweak kafka params to prevent issues on executors
207207
*/
208208
private[kafka09] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
209+
logWarning(s"overriding ${ConsumerConfig.STREAMS_ZEROOFFSET_RECORD_ON_EOF_CONFIG} to true")
210+
kafkaParams.put(ConsumerConfig.STREAMS_ZEROOFFSET_RECORD_ON_EOF_CONFIG, true: java.lang.Boolean)
211+
209212
logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
210213
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
211214

0 commit comments

Comments
 (0)