-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-21492][SQL][2.4] Fix memory leak in SortMergeJoin #26210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/### What changes were proposed in this pull request? We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below: - Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR. - Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator. /### Why are the changes needed? Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup. /### Does this PR introduce any user-facing change? No. /### How was this patch tested? UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario. Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test [here](apache#23762 (comment))) will pass with this PR. ``` from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show() ```
Test build #112460 has finished for PR 26210 at commit
|
Test build #112461 has finished for PR 26210 at commit
|
Please add [2.4] to PR title? |
thanks, merging to 2.4! |
cloud-fan
pushed a commit
that referenced
this pull request
Oct 23, 2019
### What changes were proposed in this pull request? We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below: - Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR. - Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator. ### Why are the changes needed? Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario. Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test [here](#23762 (comment))) will pass with this PR. ``` from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show() ``` Closes #26210 from xuanyuanking/SPARK-21492-backport. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Thanks! |
wangqia0309
pushed a commit
to bigo-sg/spark
that referenced
this pull request
Oct 30, 2019
### What changes were proposed in this pull request? We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below: - Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR. - Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator. ### Why are the changes needed? Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario. Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test [here](apache#23762 (comment))) will pass with this PR. ``` from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show() ``` Closes apache#26210 from xuanyuanking/SPARK-21492-backport. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jstokes
pushed a commit
to amperity/spark
that referenced
this pull request
Jul 31, 2020
### What changes were proposed in this pull request? We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below: - Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR. - Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator. ### Why are the changes needed? Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario. Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test [here](apache#23762 (comment))) will pass with this PR. ``` from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show() ``` Closes apache#26210 from xuanyuanking/SPARK-21492-backport. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
cleanupResources
in SparkPlan, which default call children'scleanupResources
function, the operator which need a resource cleanup should rewrite this with the self cleanup and also callsuper.cleanupResources
, like SortExec in this PR.cleanupResources
to do the cleanup job for all its upstream(children) operator.Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.
Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test here) will pass with this PR.