-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-32148][SS] Fix stream-stream join issue on missing to copy reused unsafe row #28975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
It's reported as left-outer join specific, but given the possibility of impact I took the representation of "stream-stream join" instead of "left/right outer stream-stream join". |
Test build #124869 has finished for PR 28975 at commit
|
retest this, please |
Test build #124875 has finished for PR 28975 at commit
|
|
||
// NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of | ||
// elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update | ||
// the match flag which the logic for outer join is relying on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify: this comment is not related to the bug and just to document an existing assumption?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes right.
TBH I suspected this first and crafted a patch including the part with new iterator explicitly runs the logic after evaluating innerOutputIter, and later realized current logic already dealt with this properly, because removeOldState() doesn't materialize the candidates and evaluate lazily. This patch contains minimal change.
Worth to mention how it works for someone who may need to touch here.
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
Test build #124932 has finished for PR 28975 at commit
|
retest this, please |
Test build #125017 has finished for PR 28975 at commit
|
@@ -259,6 +269,9 @@ class SymmetricHashJoinStateManager( | |||
return null | |||
} | |||
|
|||
// Make a copy on value row, as below cleanup logic may update the value row silently. | |||
currentValue = currentValue.copy(value = currentValue.value.copy()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this is the only place to do copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That wasn't necessary for format V1 as the original row was stored into state store, and state store (strictly saying, the implementation of HDFS state store provider) makes sure these rows are copied version.
For other places, it can propagate to the callers outside of state manager, and looks like these callers don't need to copy the row. (It's super tricky for me to determine whether the copy is necessary or not, if the code is not in a simple loop or stream.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After seeing the new changes, I think the first version looks better. The caller sides is nested and we still have unnecessary copies for v1 format. What do you think? @viirya
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, also prefer the first approach personally. As the issue was in v2 format, the first version is a straightforward way.
@cloud-fan typo? ... unnecessary copies for v1 format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I'll roll back the change. I'll also leave a commit sha so we can do back and forth depending on the decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just reverted the latest commit to leave the history and pick the commit selectively according to the decision.
retest this, please |
Test build #125051 has finished for PR 28975 at commit
|
retest this, please |
Test build #125121 has finished for PR 28975 at commit
|
retest this, please |
#29017 is created based on @HeartSaVioR 's reporting for #28975 (comment) . |
Test build #125126 has finished for PR 28975 at commit
|
retest this, please |
Test build #125151 has finished for PR 28975 at commit
|
retest this please |
1 similar comment
retest this please |
…ts to all callers" This reverts commit be34258.
Test build #125257 has finished for PR 28975 at commit
|
Test build #125258 has finished for PR 28975 at commit
|
retest this please |
Test build #125271 has finished for PR 28975 at commit
|
retest this, please |
retest this please |
* Convert the value row to (actual value, match) pair. | ||
* | ||
* NOTE: implementations should ensure the result row is NOT reused during execution, as | ||
* caller may use the value to store without copy(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to update the comment. It's not because of storing, but the caller side updates the row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I get it. The problem occurs when caller reads the value lately and there's "another" interaction with the method in the middle of. I agree the sentence in the source code comment is not clear as well though.
Would it be better if we can rephrase as "... during execution, so that caller can safely read the value in any time" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. I was referring to #28975 (comment)
Test build #125332 has finished for PR 28975 at commit
|
retest this please |
Test build #125317 has finished for PR 28975 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change itself looks good, some minors found.
test("SPARK-32148 stream-stream join regression on Spark 3.0.0") { | ||
val input1 = MemoryStream[(Timestamp, String, String)] | ||
val df1 = input1.toDF | ||
.selectExpr("_1 as eventTime", "_2 as id", "_3 as comment") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason why not use select
? I don't see any expression here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's pretty much simpler and more readable than select('_1.as("eventTime"), '_2.as("id"), '_3.as("comment"))
(or even with col(...) if ' notation doesn't work for _1, _2, _3).
|
||
val input2 = MemoryStream[(Timestamp, String, String)] | ||
val df2 = input2.toDF | ||
.selectExpr("_1 as eventTime", "_2 as id", "_3 as name") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here as well.
val joined = df1.as("left") | ||
.join(df2.as("right"), | ||
expr(s""" | ||
|left.id = right.id AND left.eventTime BETWEEN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The indentation of """ looks vary on the codebase, and I can find same indentation on the codebase.
|
||
val joined = df1.as("left") | ||
.join(df2.as("right"), | ||
expr(s""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why string interpolation needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that's not necessary. Will remove.
I'm also suffering from flaky executions, hope this round will pass. |
Test build #125352 has finished for PR 28975 at commit
|
retest this please |
Test build #125389 has started for PR 28975 at commit |
### What changes were proposed in this pull request? This PR aims to disable SBT `unidoc` generation testing in Jenkins environment because it's flaky in Jenkins environment and not used for the official documentation generation. Also, GitHub Action has the correct test coverage for the official documentation generation. - #28848 (comment) (amp-jenkins-worker-06) - #28926 (comment) (amp-jenkins-worker-06) - #28969 (comment) (amp-jenkins-worker-06) - #28975 (comment) (amp-jenkins-worker-05) - #28986 (comment) (amp-jenkins-worker-05) - #28992 (comment) (amp-jenkins-worker-06) - #28993 (comment) (amp-jenkins-worker-05) - #28999 (comment) (amp-jenkins-worker-04) - #29010 (comment) (amp-jenkins-worker-03) - #29013 (comment) (amp-jenkins-worker-04) - #29016 (comment) (amp-jenkins-worker-05) - #29025 (comment) (amp-jenkins-worker-04) - #29042 (comment) (amp-jenkins-worker-03) ### Why are the changes needed? Apache Spark `release-build.sh` generates the official document by using the following command. - https://github.com/apache/spark/blob/master/dev/create-release/release-build.sh#L341 ```bash PRODUCTION=1 RELEASE_VERSION="$SPARK_VERSION" jekyll build ``` And, this is executed by the following `unidoc` command for Scala/Java API doc. - https://github.com/apache/spark/blob/master/docs/_plugins/copy_api_dirs.rb#L30 ```ruby system("build/sbt -Pkinesis-asl clean compile unidoc") || raise("Unidoc generation failed") ``` However, the PR builder disabled `Jekyll build` and instead has a different test coverage. ```python # determine if docs were changed and if we're inside the amplab environment # note - the below commented out until *all* Jenkins workers can get `jekyll` installed # if "DOCS" in changed_modules and test_env == "amplab_jenkins": # build_spark_documentation() ``` ``` Building Unidoc API Documentation ======================================================================== [info] Building Spark unidoc using SBT with these arguments: -Phadoop-3.2 -Phive-2.3 -Pspark-ganglia-lgpl -Pkubernetes -Pmesos -Phadoop-cloud -Phive -Phive-thriftserver -Pkinesis-asl -Pyarn unidoc ``` ### Does this PR introduce _any_ user-facing change? No. (This is used only for testing and not used in the official doc generation.) ### How was this patch tested? Pass the Jenkins without doc generation invocation. Closes #29017 from dongjoon-hyun/SPARK-DOC-GEN. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Test build #125403 has finished for PR 28975 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
thanks, merging to master/3.0! |
…sed unsafe row ### What changes were proposed in this pull request? This patch fixes the odd join result being occurred from stream-stream join for state store format V2. There're some spots on V2 path which leverage UnsafeProjection. As the result row is reused, the row should be copied to avoid changing value during reading (or make sure the caller doesn't affect by such behavior) but `SymmetricHashJoinStateManager.removeByValueCondition` violates the case. This patch makes `KeyWithIndexToValueRowConverterV2.convertValue` copy the row by itself so that callers don't need to take care about it. This patch doesn't change the behavior of `KeyWithIndexToValueRowConverterV2.convertToValueRow` to avoid double-copying, as the caller is expected to store the row which the implementation of state store will call `copy()`. This patch adds such behavior into each method doc in `KeyWithIndexToValueRowConverter`, so that further contributors can read through and make sure the change / new addition doesn't break the contract. ### Why are the changes needed? Stream-stream join with state store format V2 (newly added in Spark 3.0.0) has a serious correctness bug which brings indeterministic result. ### Does this PR introduce _any_ user-facing change? Yes, some of Spark 3.0.0 users using stream-stream join from the new checkpoint (as the bug exists to only v2 format path) may encounter wrong join result. This patch will fix it. ### How was this patch tested? Reported case is converted to the new UT, and confirmed UT passed. All UTs in StreamingInnerJoinSuite and StreamingOuterJoinSuite passed as well Closes #28975 from HeartSaVioR/SPARK-32148. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 526cb2d) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Thanks all for reviewing and merging! |
What changes were proposed in this pull request?
This patch fixes the odd join result being occurred from stream-stream join for state store format V2.
There're some spots on V2 path which leverage UnsafeProjection. As the result row is reused, the row should be copied to avoid changing value during reading (or make sure the caller doesn't affect by such behavior) but
SymmetricHashJoinStateManager.removeByValueCondition
violates the case.This patch makes
KeyWithIndexToValueRowConverterV2.convertValue
copy the row by itself so that callers don't need to take care about it. This patch doesn't change the behavior ofKeyWithIndexToValueRowConverterV2.convertToValueRow
to avoid double-copying, as the caller is expected to store the row which the implementation of state store will callcopy()
.This patch adds such behavior into each method doc in
KeyWithIndexToValueRowConverter
, so that further contributors can read through and make sure the change / new addition doesn't break the contract.Why are the changes needed?
Stream-stream join with state store format V2 (newly added in Spark 3.0.0) has a serious correctness bug which brings indeterministic result.
Does this PR introduce any user-facing change?
Yes, some of Spark 3.0.0 users using stream-stream join from the new checkpoint (as the bug exists to only v2 format path) may encounter wrong join result. This patch will fix it.
How was this patch tested?
Reported case is converted to the new UT, and confirmed UT passed. All UTs in StreamingInnerJoinSuite and StreamingOuterJoinSuite passed as well