-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-29223][SQL][SS] New option to specify timestamp on all subscribing topic-partitions in Kafka source #32609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…bing topic-partitions in Kafka source
Test build #138756 has finished for PR 32609 at commit
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #138786 has finished for PR 32609 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly looks good, left some questions/nits.
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) | ||
case None => defaultOffsets | ||
} | ||
// The order below represents "preferences" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
specifying both global timestamp and specific timestamp for partition
added to test case 1. vs 2. fallback which is good. In order to cover all "preferences" maybe we can add a case where all config options added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I changed the test a bit to cover two cases: all options / timestamp per partition vs offset. Thanks for the suggestion.
val tsStr = params(globalOffsetTimestampOptionKey).trim | ||
try { | ||
val ts = tsStr.toLong | ||
return GlobalTimestampRangeLimit(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nit: If we put case 2 and 3 into else then we don't need return statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intended to avoid deeper level of indentations, but if ~ else if ~ else would achieve the same without return. Will address.
/** | ||
* Resolves the specific offsets based on timestamp per all topic-partitions being subscribed. | ||
* The returned offset for each partition is the earliest offset whose timestamp is greater | ||
* than or equal to the given timestamp in the corresponding partition. If the matched offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: If the matched offset doesn't exist
is a bit odd construct. Either matched or doesn't exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me refine the words. Basically it queries to Kafka and we are explaining the mechanism, it seems OK to say If Kafka doesn't return the matched offset
.
### Details on timestamp offset options | ||
|
||
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. | ||
The behavior varies across options if the matched offset doesn't exist - check the description of each option. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as down below in the file.
The behavior varies across options if the matched offset doesn't exist - check the description of each option. | ||
|
||
Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. | ||
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting exact version information into the doc needs time to time attention.
https://kafka.apache.org/21/...
If we assume Kafka is not breaking API then we can put latest
instead of 21
. Though not sure Kafka has such link.
BTW why 21
, we're on <kafka.version>2.8.0</kafka.version>
and feature requires minimum 0.10.1.0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, there's no notion of "latest" so we picked the version we used at that time. (Worth noting that the content was added when we added timestamp offset.)
I'm OK to either raising the version to the one we use in 3.2 or lowering the version to minimum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've double checked and no "latest" found. I think from maintenance perspective it would be the best to lower the version to the minimum. Such case the API must remain the same and we don't have to touch it time to time. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. Hopefully Kafka community looks to maintain all versions of doc so the risk of broken link is relatively low. I'll update the link to 0.10.1.x. Thanks!
@@ -370,16 +384,11 @@ The following configurations are optional: | |||
<td>none (the value of <code>startingOffsets</code> will apply)</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to "next preference is ..." for consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice finding!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks okay overall.
Test build #138897 has finished for PR 32609 at commit
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
When the hardcoded version discussion is resolved then it's good to go from my perspective. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Test build #138917 has finished for PR 32609 at commit
|
Kubernetes integration test starting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* @param timestamp the timestamp. | ||
* @param failsOnNoMatchingOffset whether to fail the query when no matched offset can be found. | ||
*/ | ||
def fetchGlobalTimestampBasedOffsets(timestamp: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit for the code style, maybe a new line for the params?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice finding! Fixed it.
Kubernetes integration test status success |
Test build #138924 has finished for PR 32609 at commit
|
Kubernetes integration test starting |
I'm merging this as I got approvals with comments for a few nits, and I addressed all of them. Thanks all for the reviewing! |
Kubernetes integration test status success |
lgtm too |
@HeartSaVioR Thanks for this option. It makes sense to use the same timestamp across the subscribed topics. @viirya Will it be a part of Spark 3.2.0 release? |
Yes. This was merged before 3.2 branch cut. |
What changes were proposed in this pull request?
This patch is a follow-up of SPARK-26848 (#23747). In SPARK-26848, we decided to open possibility to let end users set individual timestamp per partition. But in many cases, specifying timestamp represents the intention that we would want to go back to specific timestamp and reprocess records, which should be applied to all topics and partitions.
This patch proposes to provide a way to set a global timestamp across topic-partitions which the source is subscribing to, so that end users can set all offsets by specific timestamp easily. To provide the way to config the timestamp easier, the new options only receive "a" timestamp for start/end timestamp.
New options introduced in this PR:
All two options receive timestamp as string.
There're priorities for options regarding starting/ending offset as we will have three options for start offsets and another three options for end offsets. Priorities are following:
Why are the changes needed?
Existing option to specify timestamp as offset is quite verbose if there're a lot of partitions across topics. Suppose there're 100s of partitions in a topic, the json should contain 100s of times of the same timestamp.
Also, the number of partitions can also change, which requires either:
Both approaches are even not "acceptable" if we're dealing with ad-hoc query; anyone doesn't want to write the code more complicated than the query itself. Flink provides the option to specify a timestamp for all topic-partitions like this PR, and even doesn't provide the option to specify the timestamp per topic-partition.
With this PR, end users are only required to provide a single timestamp value. No more complicated JSON format end users need to know about the structure.
Does this PR introduce any user-facing change?
Yes, this PR introduces two new options, described in above section.
Doc changes are following:
How was this patch tested?
New UTs covering new functionalities. Also manually tested via simple batch & streaming queries.