Skip to content

[SPARK-21492][SQL][2.4] Fix memory leak in SortMergeJoin #26210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

xuanyuanking
Copy link
Member

What changes were proposed in this pull request?

We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:

  • Add function named cleanupResources in SparkPlan, which default call children's cleanupResources function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call super.cleanupResources, like SortExec in this PR.
  • Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the cleanupResources to do the cleanup job for all its upstream(children) operator.

Why are the changes needed?

Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test here) will pass with this PR.

from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()

/### What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
- Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
- Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.

/### Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

/### Does this PR introduce any user-facing change?
No.

/### How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test  [here](apache#23762 (comment))) will pass with this PR.

```
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
```
@SparkQA
Copy link

SparkQA commented Oct 22, 2019

Test build #112460 has finished for PR 26210 at commit 814e64f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 22, 2019

Test build #112461 has finished for PR 26210 at commit a4422d6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Oct 22, 2019

Please add [2.4] to PR title?

@xuanyuanking xuanyuanking changed the title [SPARK-21492][SQL] Fix memory leak in SortMergeJoin [SPARK-21492][SQL][2.4] Fix memory leak in SortMergeJoin Oct 23, 2019
@cloud-fan
Copy link
Contributor

thanks, merging to 2.4!

cloud-fan pushed a commit that referenced this pull request Oct 23, 2019
### What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
- Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
- Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.

### Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test  [here](#23762 (comment))) will pass with this PR.

```
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
```

Closes #26210 from xuanyuanking/SPARK-21492-backport.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan cloud-fan closed this Oct 23, 2019
@xuanyuanking
Copy link
Member Author

Thanks!

@xuanyuanking xuanyuanking deleted the SPARK-21492-backport branch October 23, 2019 08:52
wangqia0309 pushed a commit to bigo-sg/spark that referenced this pull request Oct 30, 2019
### What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
- Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
- Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.

### Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test  [here](apache#23762 (comment))) will pass with this PR.

```
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
```

Closes apache#26210 from xuanyuanking/SPARK-21492-backport.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jstokes pushed a commit to amperity/spark that referenced this pull request Jul 31, 2020
### What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
- Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
- Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.

### Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test  [here](apache#23762 (comment))) will pass with this PR.

```
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
```

Closes apache#26210 from xuanyuanking/SPARK-21492-backport.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants