-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40925][SQL][SS] Fix stateful operator late record filtering #38405
Changes from 20 commits
acc76dc
8831691
59002a2
3f1c322
179f422
6262312
7c1f066
d031e32
c789d19
4e12828
36b6826
764be57
5947fc9
bee6185
0e96fae
565f8af
24f1b61
9515f10
03cbdd6
572263d
68b14e3
6a023ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1941,6 +1941,22 @@ object SQLConf { | |
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val STATEFUL_OPERATOR_ALLOW_MULTIPLE = | ||
buildConf("spark.sql.streaming.statefulOperator.allowMultiple") | ||
.internal() | ||
.doc("When true, multiple stateful operators are allowed to be present in a streaming " + | ||
"pipeline. The support for multiple stateful operators introduces a minor (semantically " + | ||
"correct) change in respect to late record filtering - late records are detected and " + | ||
"filtered in respect to the watermark from the previous microbatch instead of the " + | ||
"current one. This is a behavior change for Spark streaming pipelines and we allow " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implies that we apply the same watermark for all stateful operators, right? I'd expect existing tests to be failing since we introduce a behavioral change, but given existing tests all pass, looks like it is due to no-data batch which effectively makes previous watermark to be caught up with next watermark. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently it is the same watermark passed to all operators. The issue is if anyone has nit tests which check exactly what records are filtered with carefully constructed batches and Trigger.Once - such tests can detect the change in behavior and fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we will have a bunch of errors in test suites when we disable no-data batch. Most test cases are assuming that no-data batch always happens. |
||
"users to revert to the previous behavior of late record filtering (late records are " + | ||
"detected and filtered by comparing with the current microbatch watermark) by setting " + | ||
"the flag value to false. In this mode, only a single stateful operator will be allowed " + | ||
"in a streaming pipeline.") | ||
.version("3.4.0") | ||
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION = | ||
buildConf("spark.sql.streaming.statefulOperator.useStrictDistribution") | ||
.internal() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Oh I missed that the change from rule side has already merged as a part of introduction of
window_time
.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, are you asking for anything actionable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. Just for future visibility.