Skip to content

[SPARK-26848][SQL][SS] Introduce new option to Kafka source: offset by timestamp (starting/ending) #23747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,27 @@ The following configurations are optional:

<table class="table">
<tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
<tr>
<td>startingOffsetsByTimestamp</td>
<td>json string
""" {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
</td>
<td>none (the value of <code>startingOffsets<code/> will apply)</td>
<td>streaming and batch</td>
<td>The start point of timestamp when a query is started, a json string specifying a starting timestamp for
each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or
equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist,
the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
<p/>
Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/>
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/>
Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/>
Note: This option requires Kafka 0.10.1.0 or higher.<p/>
Note2: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.<p/>
Note3: For streaming queries, this only applies when a new query is started, and that resuming will
always pick up from where the query left off. Newly discovered partitions during a query will start at
earliest.</td>
</tr>
<tr>
<td>startingOffsets</td>
<td>"earliest", "latest" (streaming only), or json string
Expand All @@ -377,6 +398,25 @@ The following configurations are optional:
always pick up from where the query left off. Newly discovered partitions during a query will start at
earliest.</td>
</tr>
<tr>
<td>endingOffsetsByTimestamp</td>
<td>json string
""" {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
</td>
<td>latest</td>
<td>batch query</td>
<td>The end point when a batch query is ended, a json string specifying an ending timesamp for each TopicPartition.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to
the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will
be set to latest.<p/>
<p/>
Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/>
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/>
Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/>
Note: This option requires Kafka 0.10.1.0 or higher.<p/>
Note2: <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.
</td>
</tr>
<tr>
<td>endingOffsets</td>
<td>latest or json string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,35 @@ private object JsonUtils {
partOffsets.map { case (part, offset) =>
new TopicPartition(topic, part) -> offset
}
}.toMap
}
} catch {
case NonFatal(x) =>
throw new IllegalArgumentException(
s"""Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got $str""")
}
}

def partitionTimestamps(str: String): Map[TopicPartition, Long] = {
try {
Serialization.read[Map[String, Map[Int, Long]]](str).flatMap { case (topic, partTimestamps) =>
partTimestamps.map { case (part, timestamp) =>
new TopicPartition(topic, part) -> timestamp
}
}
} catch {
case NonFatal(x) =>
throw new IllegalArgumentException(
s"""Expected e.g. {"topicA": {"0": 123456789, "1": 123456789},
|"topicB": {"0": 123456789, "1": 123456789}}, got $str""".stripMargin)
}
}

/**
* Write per-TopicPartition offsets as json string
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
implicit val ordering = new Ordering[TopicPartition] {
implicit val order = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
Expand All @@ -95,4 +110,9 @@ private object JsonUtils {
}
Serialization.write(result)
}

def partitionTimestamps(topicTimestamps: Map[TopicPartition, Long]): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the first glance don't see the difference compared to partitionOffsets (apart from the incoming parameter name).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is now same but different meaning so I wouldn't remove the method. Maybe I'll call partitionOffsets here for now and fix here if we have to make a difference.

// For now it's same as partitionOffsets
partitionOffsets(topicTimestamps)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ private[kafka010] class KafkaBatch(
// Leverage the KafkaReader to obtain the relevant partition offsets
val (fromPartitionOffsets, untilPartitionOffsets) = {
try {
(kafkaOffsetReader.fetchPartitionOffsets(startingOffsets),
kafkaOffsetReader.fetchPartitionOffsets(endingOffsets))
(kafkaOffsetReader.fetchPartitionOffsets(startingOffsets, isStartingOffsets = true),
kafkaOffsetReader.fetchPartitionOffsets(endingOffsets, isStartingOffsets = false))
} finally {
kafkaOffsetReader.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class KafkaContinuousStream(
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) => offsetReader.fetchSpecificTimestampBasedOffsets(p,
failsOnNoMatchingOffset = true)
}
logInfo(s"Initial offsets: $offsets")
offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ private[kafka010] class KafkaMicroBatchStream(
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true)
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ private[kafka010] case object LatestOffsetRangeLimit extends KafkaOffsetRangeLim
private[kafka010] case class SpecificOffsetRangeLimit(
partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

/**
* Represents the desire to bind to earliest offset which timestamp for the offset is equal or
* greater than specific timestamp.
*/
private[kafka010] case class SpecificTimestampRangeLimit(
topicTimestamps: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

private[kafka010] object KafkaOffsetRangeLimit {
/**
* Used to denote offset range limits that are resolved via Kafka
Expand Down
Loading