-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40925][SQL][SS] Fix stateful operator late record filtering #38405
[SPARK-40925][SQL][SS] Fix stateful operator late record filtering #38405
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks OK in overall. Left comments.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
Show resolved
Hide resolved
@@ -68,11 +68,29 @@ case class SessionWindow(timeColumn: Expression, gapDuration: Expression) extend | |||
with Unevaluable | |||
with NonSQLExpression { | |||
|
|||
private def inputTypeOnTimeColumn: AbstractDataType = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Oh I missed that the change from rule side has already merged as a part of introduction of window_time
.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, are you asking for anything actionable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. Just for future visibility.
…ng/OffsetSeqLog.scala Co-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…balikov/spark into multiple_stateful-ops-base
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I'll leave this PR in a couple of days to see whether we can get another eyes of review. I'll merge this in early next week there is no outstanding comment.
OK, no outstanding comment so far. Thanks! Merging to master. |
…ueries ### What changes were proposed in this pull request? As a followup to [SPARK-40925], [github PR](#38405), Remove corresponding checks in UnsupportedOperationChecker so that customers don't have to explicitly add new conf withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") to use the new multi-stateful operators. In other words we are enabling multi-stateful operators by default. As a side effect, the API of `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan, OutputMode)` is also changed to `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan)` New tests are added to `MultiStatefulOperatorsSuite.scala`, but I could also add equivalent ones to `UnsupportedOperationsSuite.scala` if requested. ### Why are the changes needed? To enable new multiple-stateful operators by default. Right now users need to set SQL conf `unsupportedOperationCheck` to false explicitly, which also disables many other useful checks. ### Does this PR introduce _any_ user-facing change? No. All current running queries won't be impacted. But new queries could use chained stateful operators. ### How was this patch tested? Unit Tests. Closes #38503 from WweiL/SPARK-40940-multi-state-checkers. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request? This PR fixes the input late record filtering done by stateful operators to allow for chaining of stateful operators. Currently stateful operators are initialized with the current microbatch watermark and perform both input late record filtering and state eviction (e.g. producing aggregations) using the same watermark value. The state evicted (or aggregates produced) due to watermark advancing is behind the watermark and thus effectively late - if a following stateful operator consumes the output of the previous one, the input records will be filtered as late. This PR provides two watermark values to the stateful operators - one from the previous microbatch to be used for late record filtering and the one from the current microbatch (as in the existing code) to be used for state eviction. This solves the above problem of the broken late record filtering. Note that this PR still does not solve the issue of time-interval stream join producing records delayed against the watermark. Therefore time-interval streaming join followed by stateful operators is still not supported. That will be fixed in a follow up PR (and a SPIP) effectively replacing the single global watermark with conceptually watermarks per operator. Also, the stateful operator chains unblocked by this PR (e.g. a chain of window aggregations) are still blocked by the unsupported operations checker. The new test for these scenarios - MultiStatefulOperatorsSuite has to explicitly disable the unsupported ops check. This again will be fixed in a follow-up PR. ### Why are the changes needed? The PR allows Spark Structured Streaming to support chaining of stateful operators e.g. chaining of time window aggregations which is a meaningful streaming scenario. ### Does this PR introduce _any_ user-facing change? With this PR, chains of stateful operators will be supported in Spark Structured Streaming. ### How was this patch tested? Added a new test suite - MultiStatefulOperatorsSuite Closes apache#38405 from alex-balikov/multiple_stateful-ops-base. Lead-authored-by: Alex Balikov <91913242+alex-balikov@users.noreply.github.com> Co-authored-by: Alex Balikov <alex.balikov@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…ueries ### What changes were proposed in this pull request? As a followup to [SPARK-40925], [github PR](apache#38405), Remove corresponding checks in UnsupportedOperationChecker so that customers don't have to explicitly add new conf withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") to use the new multi-stateful operators. In other words we are enabling multi-stateful operators by default. As a side effect, the API of `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan, OutputMode)` is also changed to `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan)` New tests are added to `MultiStatefulOperatorsSuite.scala`, but I could also add equivalent ones to `UnsupportedOperationsSuite.scala` if requested. ### Why are the changes needed? To enable new multiple-stateful operators by default. Right now users need to set SQL conf `unsupportedOperationCheck` to false explicitly, which also disables many other useful checks. ### Does this PR introduce _any_ user-facing change? No. All current running queries won't be impacted. But new queries could use chained stateful operators. ### How was this patch tested? Unit Tests. Closes apache#38503 from WweiL/SPARK-40940-multi-state-checkers. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…ueries ### What changes were proposed in this pull request? As a followup to [SPARK-40925], [github PR](apache#38405), Remove corresponding checks in UnsupportedOperationChecker so that customers don't have to explicitly add new conf withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") to use the new multi-stateful operators. In other words we are enabling multi-stateful operators by default. As a side effect, the API of `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan, OutputMode)` is also changed to `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan)` New tests are added to `MultiStatefulOperatorsSuite.scala`, but I could also add equivalent ones to `UnsupportedOperationsSuite.scala` if requested. ### Why are the changes needed? To enable new multiple-stateful operators by default. Right now users need to set SQL conf `unsupportedOperationCheck` to false explicitly, which also disables many other useful checks. ### Does this PR introduce _any_ user-facing change? No. All current running queries won't be impacted. But new queries could use chained stateful operators. ### How was this patch tested? Unit Tests. Closes apache#38503 from WweiL/SPARK-40940-multi-state-checkers. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…ueries ### What changes were proposed in this pull request? As a followup to [SPARK-40925], [github PR](apache#38405), Remove corresponding checks in UnsupportedOperationChecker so that customers don't have to explicitly add new conf withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") to use the new multi-stateful operators. In other words we are enabling multi-stateful operators by default. As a side effect, the API of `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan, OutputMode)` is also changed to `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan)` New tests are added to `MultiStatefulOperatorsSuite.scala`, but I could also add equivalent ones to `UnsupportedOperationsSuite.scala` if requested. ### Why are the changes needed? To enable new multiple-stateful operators by default. Right now users need to set SQL conf `unsupportedOperationCheck` to false explicitly, which also disables many other useful checks. ### Does this PR introduce _any_ user-facing change? No. All current running queries won't be impacted. But new queries could use chained stateful operators. ### How was this patch tested? Unit Tests. Closes apache#38503 from WweiL/SPARK-40940-multi-state-checkers. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
This PR fixes the input late record filtering done by stateful operators to allow for chaining of stateful operators. Currently stateful operators are initialized with the current microbatch watermark and perform both input late record filtering and state eviction (e.g. producing aggregations) using the same watermark value. The state evicted (or aggregates produced) due to watermark advancing is behind the watermark and thus effectively late - if a following stateful operator consumes the output of the previous one, the input records will be filtered as late.
This PR provides two watermark values to the stateful operators - one from the previous microbatch to be used for late record filtering and the one from the current microbatch (as in the existing code) to be used for state eviction. This solves the above problem of the broken late record filtering.
Note that this PR still does not solve the issue of time-interval stream join producing records delayed against the watermark. Therefore time-interval streaming join followed by stateful operators is still not supported. That will be fixed in a follow up PR (and a SPIP) effectively replacing the single global watermark with conceptually watermarks per operator.
Also, the stateful operator chains unblocked by this PR (e.g. a chain of window aggregations) are still blocked by the unsupported operations checker. The new test for these scenarios - MultiStatefulOperatorsSuite has to explicitly disable the unsupported ops check. This again will be fixed in a follow-up PR.
Why are the changes needed?
The PR allows Spark Structured Streaming to support chaining of stateful operators e.g. chaining of time window aggregations which is a meaningful streaming scenario.
Does this PR introduce any user-facing change?
With this PR, chains of stateful operators will be supported in Spark Structured Streaming.
How was this patch tested?
Added a new test suite - MultiStatefulOperatorsSuite