Skip to content

[SPARK-29223][SQL][SS] New option to specify timestamp on all subscribing topic-partitions in Kafka source #32609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 44 additions & 19 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,24 +362,33 @@ The following configurations are optional:

<table class="table">
<tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
<tr>
<td>startingTimestamp</td>
<td>timestamp string e.g. "1000"</td>
<td>none (next preference is <code>startingOffsetsByTimestamp</code>)</td>
<td>streaming and batch</td>
<td>The start point of timestamp when a query is started, a string specifying a starting timestamp for
all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
<p/>
Note1: <code>startingTimestamp</code> takes precedence over <code>startingOffsetsByTimestamp</code> and <code>startingOffsets</code>.<p/>
Note2: For streaming queries, this only applies when a new query is started, and that resuming will
always pick up from where the query left off. Newly discovered partitions during a query will start at
earliest.</td>
</tr>
<tr>
<td>startingOffsetsByTimestamp</td>
<td>json string
""" {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
</td>
<td>none (the value of <code>startingOffsets</code> will apply)</td>
<td>none (next preference is <code>startingOffsets</code>)</td>
<td>streaming and batch</td>
<td>The start point of timestamp when a query is started, a json string specifying a starting timestamp for
each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or
equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist,
each TopicPartition. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
<p/>
Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/>
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/>
Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/>
Note: This option requires Kafka 0.10.1.0 or higher.<p/>
Note2: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.<p/>
Note3: For streaming queries, this only applies when a new query is started, and that resuming will
Note1: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.<p/>
Note2: For streaming queries, this only applies when a new query is started, and that resuming will
always pick up from where the query left off. Newly discovered partitions during a query will start at
earliest.</td>
</tr>
Expand All @@ -398,23 +407,28 @@ The following configurations are optional:
always pick up from where the query left off. Newly discovered partitions during a query will start at
earliest.</td>
</tr>
<tr>
<td>endingTimestamp</td>
<td>timestamp string e.g. "1000"</td>
<td>none (next preference is <code>endingOffsetsByTimestamp</code>)</td>
<td>batch query</td>
<td>The end point when a batch query is ended, a json string specifying an ending timestamp for
all partitions in topics being subscribed. Please refer the details on timestamp offset options below.
If Kafka doesn't return the matched offset, the offset will be set to latest.<p/>
Note: <code>endingTimestamp</code> takes precedence over <code>endingOffsetsByTimestamp</code> and <code>endingOffsets</code>.<p/>
</td>
</tr>
<tr>
<td>endingOffsetsByTimestamp</td>
<td>json string
""" {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
</td>
<td>latest</td>
<td>none (next preference is <code>endingOffsets</code>)</td>
<td>batch query</td>
<td>The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to
the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will
be set to latest.<p/>
<p/>
Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/>
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/>
Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/>
Note: This option requires Kafka 0.10.1.0 or higher.<p/>
Note2: <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.
Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset,
the offset will be set to latest.<p/>
Note: <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -512,6 +526,17 @@ The following configurations are optional:
</tr>
</table>

### Details on timestamp offset options

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
The behavior varies across options if Kafka doesn't return the matched offset - check the description of each option.

Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value.
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)">javadoc</a> for details.
Also, the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.

Timestamp offset options require Kafka 0.10.1.0 or higher.

### Offset fetching

In Spark 3.0 and before Spark uses <code>KafkaConsumer</code> for offset fetching which could cause infinite wait in the driver.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class KafkaContinuousStream(
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) => offsetReader.fetchSpecificTimestampBasedOffsets(p,
failsOnNoMatchingOffset = true)
case GlobalTimestampRangeLimit(ts) => offsetReader.fetchGlobalTimestampBasedOffsets(
ts, failsOnNoMatchingOffset = true)
}
logInfo(s"Initial offsets: $offsets")
offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ private[kafka010] class KafkaMicroBatchStream(
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true)
case GlobalTimestampRangeLimit(ts) =>
kafkaOffsetReader.fetchGlobalTimestampBasedOffsets(ts, failsOnNoMatchingOffset = true)
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ private[kafka010] case class SpecificOffsetRangeLimit(
private[kafka010] case class SpecificTimestampRangeLimit(
topicTimestamps: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

/**
* Represents the desire to bind to earliest offset which timestamp for the offset is equal or
* greater than specific timestamp. This applies the timestamp to the all topics/partitions.
*/
private[kafka010] case class GlobalTimestampRangeLimit(
timestamp: Long) extends KafkaOffsetRangeLimit

private[kafka010] object KafkaOffsetRangeLimit {
/**
* Used to denote offset range limits that are resolved via Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ private[kafka010] trait KafkaOffsetReader {
partitionTimestamps: Map[TopicPartition, Long],
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset

/**
* Resolves the specific offsets based on timestamp per all topic-partitions being subscribed.
* The returned offset for each partition is the earliest offset whose timestamp is greater
* than or equal to the given timestamp in the corresponding partition. If the matched offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: If the matched offset doesn't exist is a bit odd construct. Either matched or doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me refine the words. Basically it queries to Kafka and we are explaining the mechanism, it seems OK to say If Kafka doesn't return the matched offset.

* doesn't exist, depending on `failsOnNoMatchingOffset` parameter, the offset will be set to
* latest or this method throws an error.
*
* @param timestamp the timestamp.
* @param failsOnNoMatchingOffset whether to fail the query when no matched offset can be found.
*/
def fetchGlobalTimestampBasedOffsets(
timestamp: Long,
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset

/**
* Fetch the earliest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ private[kafka010] class KafkaOffsetReaderAdmin(
case SpecificTimestampRangeLimit(partitionTimestamps) =>
fetchSpecificTimestampBasedOffsets(partitionTimestamps,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
case GlobalTimestampRangeLimit(timestamp) =>
fetchGlobalTimestampBasedOffsets(timestamp,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
}
}

Expand Down Expand Up @@ -193,6 +196,37 @@ private[kafka010] class KafkaOffsetReaderAdmin(
fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets)
}


override def fetchGlobalTimestampBasedOffsets(
timestamp: Long,
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = {
val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $timestamp")
}

val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { tps =>
val listOffsetsParams = tps.asScala.map { tp =>
tp -> OffsetSpec.forTimestamp(timestamp)
}.toMap.asJava
admin.listOffsets(listOffsetsParams, listOffsetsOptions).all().get().asScala.map {
case (tp, offsetSpec) =>
if (failsOnNoMatchingOffset) {
assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, "No offset " +
s"matched from request of topic-partition $tp and timestamp " +
s"$timestamp.")
}

if (offsetSpec.offset() == OffsetFetchResponse.INVALID_OFFSET) {
tp -> KafkaOffsetRangeLimit.LATEST
} else {
tp -> offsetSpec.offset()
}
}.toMap
}

fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets)
}

private def fetchSpecificOffsets0(
fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit,
fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ private[kafka010] class KafkaOffsetReaderConsumer(
case SpecificTimestampRangeLimit(partitionTimestamps) =>
fetchSpecificTimestampBasedOffsets(partitionTimestamps,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
case GlobalTimestampRangeLimit(timestamp) =>
fetchGlobalTimestampBasedOffsets(timestamp,
failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
}
}

Expand Down Expand Up @@ -234,6 +237,39 @@ private[kafka010] class KafkaOffsetReaderConsumer(
fnAssertFetchedOffsets)
}

override def fetchGlobalTimestampBasedOffsets(
timestamp: Long,
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = {
val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $timestamp")
}

val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { tps =>
val converted = tps.asScala.map(_ -> java.lang.Long.valueOf(timestamp)).toMap.asJava

val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
consumer.offsetsForTimes(converted)

offsetForTime.asScala.map { case (tp, offsetAndTimestamp) =>
if (failsOnNoMatchingOffset) {
assert(offsetAndTimestamp != null, "No offset matched from request of " +
s"topic-partition $tp and timestamp $timestamp.")
}

if (offsetAndTimestamp == null) {
tp -> KafkaOffsetRangeLimit.LATEST
} else {
tp -> offsetAndTimestamp.offset()
}
}.toMap
}

val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => }

fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets,
fnAssertFetchedOffsets)
}

private def fetchSpecificOffsets0(
fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit,
fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ private[kafka010] class KafkaSource(
case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
case SpecificTimestampRangeLimit(p) =>
kafkaReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true)
case GlobalTimestampRangeLimit(ts) =>
kafkaReader.fetchGlobalTimestampBasedOffsets(ts, failsOnNoMatchingOffset = true)
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
Expand Down
Loading