Skip to content

[SPARK-29223][SQL][SS] New option to specify timestamp on all subscribing topic-partitions in Kafka source #32609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented May 20, 2021

What changes were proposed in this pull request?

This patch is a follow-up of SPARK-26848 (#23747). In SPARK-26848, we decided to open possibility to let end users set individual timestamp per partition. But in many cases, specifying timestamp represents the intention that we would want to go back to specific timestamp and reprocess records, which should be applied to all topics and partitions.

This patch proposes to provide a way to set a global timestamp across topic-partitions which the source is subscribing to, so that end users can set all offsets by specific timestamp easily. To provide the way to config the timestamp easier, the new options only receive "a" timestamp for start/end timestamp.

New options introduced in this PR:

  • startingTimestamp
  • endingTimestamp

All two options receive timestamp as string.

There're priorities for options regarding starting/ending offset as we will have three options for start offsets and another three options for end offsets. Priorities are following:

  • starting offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets
  • ending offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets

Why are the changes needed?

Existing option to specify timestamp as offset is quite verbose if there're a lot of partitions across topics. Suppose there're 100s of partitions in a topic, the json should contain 100s of times of the same timestamp.

Also, the number of partitions can also change, which requires either:

  • fixing the code if the json is statically created
  • introducing the dependencies on Kafka client and deal with Kafka API on crafting json programmatically

Both approaches are even not "acceptable" if we're dealing with ad-hoc query; anyone doesn't want to write the code more complicated than the query itself. Flink provides the option to specify a timestamp for all topic-partitions like this PR, and even doesn't provide the option to specify the timestamp per topic-partition.

With this PR, end users are only required to provide a single timestamp value. No more complicated JSON format end users need to know about the structure.

Does this PR introduce any user-facing change?

Yes, this PR introduces two new options, described in above section.

Doc changes are following:

스크린샷 2021-05-21 오후 12 01 02
스크린샷 2021-05-21 오후 12 01 12
스크린샷 2021-05-21 오후 12 01 24
스크린샷 2021-05-21 오후 12 06 01

How was this patch tested?

New UTs covering new functionalities. Also manually tested via simple batch & streaming queries.

@SparkQA
Copy link

SparkQA commented May 20, 2021

Test build #138756 has finished for PR 32609 at commit f8da576.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 20, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43278/

@HyukjinKwon HyukjinKwon requested a review from gaborgsomogyi May 21, 2021 01:20
@SparkQA
Copy link

SparkQA commented May 21, 2021

Test build #138786 has finished for PR 32609 at commit ec1f662.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43309/

@SparkQA
Copy link

SparkQA commented May 21, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43309/

@HeartSaVioR
Copy link
Contributor Author

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly looks good, left some questions/nits.

case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => defaultOffsets
}
// The order below represents "preferences"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specifying both global timestamp and specific timestamp for partition added to test case 1. vs 2. fallback which is good. In order to cover all "preferences" maybe we can add a case where all config options added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I changed the test a bit to cover two cases: all options / timestamp per partition vs offset. Thanks for the suggestion.

val tsStr = params(globalOffsetTimestampOptionKey).trim
try {
val ts = tsStr.toLong
return GlobalTimestampRangeLimit(ts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: If we put case 2 and 3 into else then we don't need return statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intended to avoid deeper level of indentations, but if ~ else if ~ else would achieve the same without return. Will address.

/**
* Resolves the specific offsets based on timestamp per all topic-partitions being subscribed.
* The returned offset for each partition is the earliest offset whose timestamp is greater
* than or equal to the given timestamp in the corresponding partition. If the matched offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: If the matched offset doesn't exist is a bit odd construct. Either matched or doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me refine the words. Basically it queries to Kafka and we are explaining the mechanism, it seems OK to say If Kafka doesn't return the matched offset.

### Details on timestamp offset options

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
The behavior varies across options if the matched offset doesn't exist - check the description of each option.
Copy link
Contributor

@gaborgsomogyi gaborgsomogyi May 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as down below in the file.

The behavior varies across options if the matched offset doesn't exist - check the description of each option.

Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value.
For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.
Copy link
Contributor

@gaborgsomogyi gaborgsomogyi May 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting exact version information into the doc needs time to time attention.
https://kafka.apache.org/21/...
If we assume Kafka is not breaking API then we can put latest instead of 21. Though not sure Kafka has such link.
BTW why 21, we're on <kafka.version>2.8.0</kafka.version> and feature requires minimum 0.10.1.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, there's no notion of "latest" so we picked the version we used at that time. (Worth noting that the content was added when we added timestamp offset.)

I'm OK to either raising the version to the one we use in 3.2 or lowering the version to minimum.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've double checked and no "latest" found. I think from maintenance perspective it would be the best to lower the version to the minimum. Such case the API must remain the same and we don't have to touch it time to time. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Hopefully Kafka community looks to maintain all versions of doc so the risk of broken link is relatively low. I'll update the link to 0.10.1.x. Thanks!

@@ -370,16 +384,11 @@ The following configurations are optional:
<td>none (the value of <code>startingOffsets</code> will apply)</td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to "next preference is ..." for consistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding!

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks okay overall.

@SparkQA
Copy link

SparkQA commented May 25, 2021

Test build #138897 has finished for PR 32609 at commit afd6c11.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 25, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43418/

@gaborgsomogyi
Copy link
Contributor

When the hardcoded version discussion is resolved then it's good to go from my perspective.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@SparkQA
Copy link

SparkQA commented May 25, 2021

Test build #138917 has finished for PR 32609 at commit 20b276d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43438/

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

* @param timestamp the timestamp.
* @param failsOnNoMatchingOffset whether to fail the query when no matched offset can be found.
*/
def fetchGlobalTimestampBasedOffsets(timestamp: Long,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit for the code style, maybe a new line for the params?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding! Fixed it.

@SparkQA
Copy link

SparkQA commented May 25, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43438/

@SparkQA
Copy link

SparkQA commented May 25, 2021

Test build #138924 has finished for PR 32609 at commit b72f590.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43445/

@HeartSaVioR
Copy link
Contributor Author

I'm merging this as I got approvals with comments for a few nits, and I addressed all of them. Thanks all for the reviewing!

@SparkQA
Copy link

SparkQA commented May 25, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43445/

@viirya
Copy link
Member

viirya commented May 25, 2021

lgtm too

@rishabhsairawat
Copy link

@HeartSaVioR Thanks for this option. It makes sense to use the same timestamp across the subscribed topics.

@viirya Will it be a part of Spark 3.2.0 release?

@viirya
Copy link
Member

viirya commented Jul 22, 2021

@viirya Will it be a part of Spark 3.2.0 release?

Yes. This was merged before 3.2 branch cut.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants