Skip to content

Commit d7fa3e3

Browse files
committed
[SPARK-17834][SQL] Fetch the earliest offsets manually in KafkaSource instead of counting on KafkaConsumer
## What changes were proposed in this pull request? Because `KafkaConsumer.poll(0)` may update the partition offsets, this PR just calls `seekToBeginning` to manually set the earliest offsets for the KafkaSource initial offsets. ## How was this patch tested? Existing tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #15397 from zsxwing/SPARK-17834. (cherry picked from commit 08eac35) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
1 parent d38f38a commit d7fa3e3

File tree

2 files changed

+48
-26
lines changed

2 files changed

+48
-26
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ private[kafka010] case class KafkaSource(
8282
executorKafkaParams: ju.Map[String, Object],
8383
sourceOptions: Map[String, String],
8484
metadataPath: String,
85+
startFromEarliestOffset: Boolean,
8586
failOnDataLoss: Boolean)
8687
extends Source with Logging {
8788

@@ -109,7 +110,11 @@ private[kafka010] case class KafkaSource(
109110
private lazy val initialPartitionOffsets = {
110111
val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
111112
metadataLog.get(0).getOrElse {
112-
val offsets = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = false))
113+
val offsets = if (startFromEarliestOffset) {
114+
KafkaSourceOffset(fetchEarliestOffsets())
115+
} else {
116+
KafkaSourceOffset(fetchLatestOffsets())
117+
}
113118
metadataLog.add(0, offsets)
114119
logInfo(s"Initial offsets: $offsets")
115120
offsets
@@ -123,7 +128,7 @@ private[kafka010] case class KafkaSource(
123128
// Make sure initialPartitionOffsets is initialized
124129
initialPartitionOffsets
125130

126-
val offset = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = true))
131+
val offset = KafkaSourceOffset(fetchLatestOffsets())
127132
logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
128133
Some(offset)
129134
}
@@ -227,26 +232,34 @@ private[kafka010] case class KafkaSource(
227232
override def toString(): String = s"KafkaSource[$consumerStrategy]"
228233

229234
/**
230-
* Fetch the offset of a partition, either seek to the latest offsets or use the current offsets
231-
* in the consumer.
235+
* Fetch the earliest offsets of partitions.
232236
*/
233-
private def fetchPartitionOffsets(
234-
seekToEnd: Boolean): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
235-
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
236-
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
237+
private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
237238
// Poll to get the latest assigned partitions
238239
consumer.poll(0)
239240
val partitions = consumer.assignment()
240241
consumer.pause(partitions)
241-
logDebug(s"Partitioned assigned to consumer: $partitions")
242+
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning")
242243

243-
// Get the current or latest offset of each partition
244-
if (seekToEnd) {
245-
consumer.seekToEnd(partitions)
246-
logDebug("Seeked to the end")
247-
}
244+
consumer.seekToBeginning(partitions)
245+
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
246+
logDebug(s"Got earliest offsets for partition : $partitionOffsets")
247+
partitionOffsets
248+
}
249+
250+
/**
251+
* Fetch the latest offset of partitions.
252+
*/
253+
private def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
254+
// Poll to get the latest assigned partitions
255+
consumer.poll(0)
256+
val partitions = consumer.assignment()
257+
consumer.pause(partitions)
258+
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")
259+
260+
consumer.seekToEnd(partitions)
248261
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
249-
logDebug(s"Got offsets for partition : $partitionOffsets")
262+
logDebug(s"Got latest offsets for partition : $partitionOffsets")
250263
partitionOffsets
251264
}
252265

@@ -256,22 +269,21 @@ private[kafka010] case class KafkaSource(
256269
*/
257270
private def fetchNewPartitionEarliestOffsets(
258271
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
259-
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
260-
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
261272
// Poll to get the latest assigned partitions
262273
consumer.poll(0)
263274
val partitions = consumer.assignment()
275+
consumer.pause(partitions)
264276
logDebug(s"\tPartitioned assigned to consumer: $partitions")
265277

266278
// Get the earliest offset of each partition
267279
consumer.seekToBeginning(partitions)
268-
val partitionToOffsets = newPartitions.filter { p =>
280+
val partitionOffsets = newPartitions.filter { p =>
269281
// When deleting topics happen at the same time, some partitions may not be in `partitions`.
270282
// So we need to ignore them
271283
partitions.contains(p)
272284
}.map(p => p -> consumer.position(p)).toMap
273-
logDebug(s"Got offsets for new partitions: $partitionToOffsets")
274-
partitionToOffsets
285+
logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
286+
partitionOffsets
275287
}
276288

277289
/**
@@ -284,6 +296,9 @@ private[kafka010] case class KafkaSource(
284296
*/
285297
private def withRetriesWithoutInterrupt(
286298
body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
299+
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
300+
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
301+
287302
synchronized {
288303
var result: Option[Map[TopicPartition, Long]] = None
289304
var attempt = 1

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,15 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
7777
// id. Hence, we should generate a unique id for each query.
7878
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
7979

80-
val autoOffsetResetValue = caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match {
81-
case Some(value) => value.trim() // same values as those supported by auto.offset.reset
82-
case None => "latest"
83-
}
80+
val startFromEarliestOffset =
81+
caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match {
82+
case Some("latest") => false
83+
case Some("earliest") => true
84+
case Some(pos) =>
85+
// This should not happen since we have already checked the options.
86+
throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos")
87+
case None => false
88+
}
8489

8590
val kafkaParamsForStrategy =
8691
ConfigUpdater("source", specifiedKafkaParams)
@@ -90,8 +95,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
9095
// So that consumers in Kafka source do not mess with any existing group id
9196
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")
9297

93-
// So that consumers can start from earliest or latest
94-
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetValue)
98+
// Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets
99+
// by itself instead of counting on KafkaConsumer.
100+
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
95101

96102
// So that consumers in the driver does not commit offsets unnecessarily
97103
.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -147,6 +153,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
147153
kafkaParamsForExecutors,
148154
parameters,
149155
metadataPath,
156+
startFromEarliestOffset,
150157
failOnDataLoss)
151158
}
152159

0 commit comments

Comments
 (0)