Skip to content

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch fixes the odd join result being occurred from stream-stream join for state store format V2.

There're some spots on V2 path which leverage UnsafeProjection. As the result row is reused, the row should be copied to avoid changing value during reading (or make sure the caller doesn't affect by such behavior) but SymmetricHashJoinStateManager.removeByValueCondition violates the case.

This patch makes KeyWithIndexToValueRowConverterV2.convertValue copy the row by itself so that callers don't need to take care about it. This patch doesn't change the behavior of KeyWithIndexToValueRowConverterV2.convertToValueRow to avoid double-copying, as the caller is expected to store the row which the implementation of state store will call copy().

This patch adds such behavior into each method doc in KeyWithIndexToValueRowConverter, so that further contributors can read through and make sure the change / new addition doesn't break the contract.

Why are the changes needed?

Stream-stream join with state store format V2 (newly added in Spark 3.0.0) has a serious correctness bug which brings indeterministic result.

Does this PR introduce any user-facing change?

Yes, some of Spark 3.0.0 users using stream-stream join from the new checkpoint (as the bug exists to only v2 format path) may encounter wrong join result. This patch will fix it.

How was this patch tested?

Reported case is converted to the new UT, and confirmed UT passed. All UTs in StreamingInnerJoinSuite and StreamingOuterJoinSuite passed as well

@HeartSaVioR
Copy link
Contributor Author

It's reported as left-outer join specific, but given the possibility of impact I took the representation of "stream-stream join" instead of "left/right outer stream-stream join".

@HeartSaVioR
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Jul 2, 2020

Test build #124869 has finished for PR 28975 at commit 82e5a76.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 2, 2020

Test build #124875 has finished for PR 28975 at commit 82e5a76.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
// elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
// the match flag which the logic for outer join is relying on.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify: this comment is not related to the bug and just to document an existing assumption?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes right.

TBH I suspected this first and crafted a patch including the part with new iterator explicitly runs the logic after evaluating innerOutputIter, and later realized current logic already dealt with this properly, because removeOldState() doesn't materialize the candidates and evaluate lazily. This patch contains minimal change.

Worth to mention how it works for someone who may need to touch here.

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #124932 has finished for PR 28975 at commit be34258.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #125017 has finished for PR 28975 at commit be34258.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -259,6 +269,9 @@ class SymmetricHashJoinStateManager(
return null
}

// Make a copy on value row, as below cleanup logic may update the value row silently.
currentValue = currentValue.copy(value = currentValue.value.copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is the only place to do copy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That wasn't necessary for format V1 as the original row was stored into state store, and state store (strictly saying, the implementation of HDFS state store provider) makes sure these rows are copied version.

For other places, it can propagate to the callers outside of state manager, and looks like these callers don't need to copy the row. (It's super tricky for me to determine whether the copy is necessary or not, if the code is not in a simple loop or stream.)

Copy link
Contributor

@cloud-fan cloud-fan Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After seeing the new changes, I think the first version looks better. The caller sides is nested and we still have unnecessary copies for v1 format. What do you think? @viirya

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, also prefer the first approach personally. As the issue was in v2 format, the first version is a straightforward way.
@cloud-fan typo? ... unnecessary copies for v1 format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'll roll back the change. I'll also leave a commit sha so we can do back and forth depending on the decision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just reverted the latest commit to leave the history and pick the commit selectively according to the decision.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #125051 has finished for PR 28975 at commit be34258.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #125121 has finished for PR 28975 at commit be34258.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 6, 2020

#29017 is created based on @HeartSaVioR 's reporting for #28975 (comment) .

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #125126 has finished for PR 28975 at commit be34258.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125151 has finished for PR 28975 at commit be34258.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member

retest this please

1 similar comment
@HeartSaVioR
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125257 has finished for PR 28975 at commit be34258.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125258 has finished for PR 28975 at commit e2201ef.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125271 has finished for PR 28975 at commit e2201ef.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@apache apache deleted a comment from AmplabJenkins Jul 8, 2020
@HeartSaVioR
Copy link
Contributor Author

retest this, please

@cloud-fan
Copy link
Contributor

retest this please

* Convert the value row to (actual value, match) pair.
*
* NOTE: implementations should ensure the result row is NOT reused during execution, as
* caller may use the value to store without copy().
Copy link
Contributor

@cloud-fan cloud-fan Jul 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to update the comment. It's not because of storing, but the caller side updates the row.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jul 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get it. The problem occurs when caller reads the value lately and there's "another" interaction with the method in the middle of. I agree the sentence in the source code comment is not clear as well though.

Would it be better if we can rephrase as "... during execution, so that caller can safely read the value in any time" ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. I was referring to #28975 (comment)

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125332 has finished for PR 28975 at commit 1c011ab.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125317 has finished for PR 28975 at commit e2201ef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change itself looks good, some minors found.

test("SPARK-32148 stream-stream join regression on Spark 3.0.0") {
val input1 = MemoryStream[(Timestamp, String, String)]
val df1 = input1.toDF
.selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason why not use select? I don't see any expression here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's pretty much simpler and more readable than select('_1.as("eventTime"), '_2.as("id"), '_3.as("comment")) (or even with col(...) if ' notation doesn't work for _1, _2, _3).


val input2 = MemoryStream[(Timestamp, String, String)]
val df2 = input2.toDF
.selectExpr("_1 as eventTime", "_2 as id", "_3 as name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here as well.

val joined = df1.as("left")
.join(df2.as("right"),
expr(s"""
|left.id = right.id AND left.eventTime BETWEEN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation of """ looks vary on the codebase, and I can find same indentation on the codebase.


val joined = df1.as("left")
.join(df2.as("right"),
expr(s"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why string interpolation needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's not necessary. Will remove.

@gaborgsomogyi
Copy link
Contributor

I'm also suffering from flaky executions, hope this round will pass.

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125352 has finished for PR 28975 at commit 1c011ab.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125389 has started for PR 28975 at commit 1c011ab.

dongjoon-hyun added a commit that referenced this pull request Jul 8, 2020
### What changes were proposed in this pull request?

This PR aims to disable SBT `unidoc` generation testing in Jenkins environment because it's flaky in Jenkins environment and not used for the official documentation generation. Also, GitHub Action has the correct test coverage for the official documentation generation.

- #28848 (comment) (amp-jenkins-worker-06)
- #28926 (comment) (amp-jenkins-worker-06)
- #28969 (comment) (amp-jenkins-worker-06)
- #28975 (comment) (amp-jenkins-worker-05)
- #28986 (comment)  (amp-jenkins-worker-05)
- #28992 (comment) (amp-jenkins-worker-06)
- #28993 (comment) (amp-jenkins-worker-05)
- #28999 (comment) (amp-jenkins-worker-04)
- #29010 (comment) (amp-jenkins-worker-03)
- #29013 (comment) (amp-jenkins-worker-04)
- #29016 (comment) (amp-jenkins-worker-05)
- #29025 (comment) (amp-jenkins-worker-04)
- #29042 (comment) (amp-jenkins-worker-03)

### Why are the changes needed?

Apache Spark `release-build.sh` generates the official document by using the following command.
- https://github.com/apache/spark/blob/master/dev/create-release/release-build.sh#L341

```bash
PRODUCTION=1 RELEASE_VERSION="$SPARK_VERSION" jekyll build
```

And, this is executed by the following `unidoc` command for Scala/Java API doc.
- https://github.com/apache/spark/blob/master/docs/_plugins/copy_api_dirs.rb#L30

```ruby
system("build/sbt -Pkinesis-asl clean compile unidoc") || raise("Unidoc generation failed")
```

However, the PR builder disabled `Jekyll build` and instead has a different test coverage.
```python
# determine if docs were changed and if we're inside the amplab environment
# note - the below commented out until *all* Jenkins workers can get `jekyll` installed
# if "DOCS" in changed_modules and test_env == "amplab_jenkins":
#    build_spark_documentation()
```

```
Building Unidoc API Documentation
========================================================================
[info] Building Spark unidoc using SBT with these arguments:
-Phadoop-3.2 -Phive-2.3 -Pspark-ganglia-lgpl -Pkubernetes -Pmesos
-Phadoop-cloud -Phive -Phive-thriftserver -Pkinesis-asl -Pyarn unidoc
```

### Does this PR introduce _any_ user-facing change?

No. (This is used only for testing and not used in the official doc generation.)

### How was this patch tested?

Pass the Jenkins without doc generation invocation.

Closes #29017 from dongjoon-hyun/SPARK-DOC-GEN.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@SparkQA
Copy link

SparkQA commented Jul 9, 2020

Test build #125403 has finished for PR 28975 at commit fb63d7e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.0!

@cloud-fan cloud-fan closed this in 526cb2d Jul 9, 2020
cloud-fan pushed a commit that referenced this pull request Jul 9, 2020
…sed unsafe row

### What changes were proposed in this pull request?

This patch fixes the odd join result being occurred from stream-stream join for state store format V2.

There're some spots on V2 path which leverage UnsafeProjection. As the result row is reused, the row should be copied to avoid changing value during reading (or make sure the caller doesn't affect by such behavior) but `SymmetricHashJoinStateManager.removeByValueCondition` violates the case.

This patch makes `KeyWithIndexToValueRowConverterV2.convertValue` copy the row by itself so that callers don't need to take care about it. This patch doesn't change the behavior of `KeyWithIndexToValueRowConverterV2.convertToValueRow` to avoid double-copying, as the caller is expected to store the row which the implementation of state store will call `copy()`.

This patch adds such behavior into each method doc in `KeyWithIndexToValueRowConverter`, so that further contributors can read through and make sure the change / new addition doesn't break the contract.

### Why are the changes needed?

Stream-stream join with state store format V2 (newly added in Spark 3.0.0) has a serious correctness bug which brings indeterministic result.

### Does this PR introduce _any_ user-facing change?

Yes, some of Spark 3.0.0 users using stream-stream join from the new checkpoint (as the bug exists to only v2 format path) may encounter wrong join result. This patch will fix it.

### How was this patch tested?

Reported case is converted to the new UT, and confirmed UT passed. All UTs in StreamingInnerJoinSuite and StreamingOuterJoinSuite passed as well

Closes #28975 from HeartSaVioR/SPARK-32148.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 526cb2d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-32148 branch July 9, 2020 10:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants