Skip to content

[SPARK-21492][SQL] Fix memory leak in SortMergeJoin #26164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from

Conversation

xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Oct 18, 2019

What changes were proposed in this pull request?

We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:

  • Add function named cleanupResources in SparkPlan, which default call children's cleanupResources function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call super.cleanupResources, like SortExec in this PR.
  • Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the cleanupResources to do the cleanup job for all its upstream(children) operator.

Why are the changes needed?

Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks @taosaildrone for providing this test here) will pass with this PR.

from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# spark.conf.set("spark.sql.sortMergeJoinExec.eagerCleanupResources", "true")

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()

@xuanyuanking
Copy link
Member Author

cc @cloud-fan @gatorsmile

@SparkQA
Copy link

SparkQA commented Oct 18, 2019

Test build #112276 has finished for PR 26164 at commit f9567d5.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -161,6 +162,10 @@ case class SortMergeJoinExec(
sqlContext.conf.sortMergeJoinExecBufferInMemoryThreshold
}

private def needEagerCleanup: Boolean = {
sqlContext.conf.getConf(SORT_MERGE_JOIN_EXEC_EAGER_CLEANUP_RESOURCES)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this only controls cleanup behavior in codege path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, thanks for reminding, done in 631f3cb

@SparkQA
Copy link

SparkQA commented Oct 19, 2019

Test build #112308 has finished for PR 26164 at commit 631f3cb.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 20, 2019

Test build #112328 has finished for PR 26164 at commit ec0f160.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 20, 2019

Test build #112336 has finished for PR 26164 at commit 93815f8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Oct 21, 2019

retest this please.

@SparkQA
Copy link

SparkQA commented Oct 21, 2019

Test build #112353 has finished for PR 26164 at commit 93815f8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 21, 2019

Test build #112362 has finished for PR 26164 at commit defaaf2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

private def attachCleanupResourceChecker(plan: SparkPlan): Unit = {
// SPARK-21492: Check cleanupResources are finally triggered in SortExec node for every
// test case
val sorts = new ArrayBuffer[SortExec]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: now we don't need this array

plan.foreachUp {
  case s: SortExec =>
    // add spy
  case _ =>
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, just do the simplify at the same time :) 6d6dd5a

@SparkQA
Copy link

SparkQA commented Oct 21, 2019

Test build #112370 has finished for PR 26164 at commit 7787d45.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 21, 2019

Test build #112372 has finished for PR 26164 at commit 6d6dd5a.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Oct 21, 2019

Test build #112389 has finished for PR 26164 at commit 6d6dd5a.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Oct 22, 2019

Test build #112431 has finished for PR 26164 at commit 6d6dd5a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in bb49c80 Oct 22, 2019
@cloud-fan
Copy link
Contributor

@xuanyuanking can you send a PR for 2.4 backport? thanks!

@xuanyuanking xuanyuanking deleted the SPARK-21492 branch October 22, 2019 11:53
@xuanyuanking
Copy link
Member Author

Sure, backport to 2.4 in #26210.

cloud-fan pushed a commit that referenced this pull request Oct 24, 2019
…database style iterator

### What changes were proposed in this pull request?
Reimplement the iterator in UnsafeExternalRowSorter in database style. This can be done by reusing the `RowIterator` in our code base.

### Why are the changes needed?
During the job in #26164, after involving a var `isReleased` in `hasNext`, there's possible that `isReleased` is false when calling `hasNext`, but it becomes true before calling `next`. A safer way is using database-style iterator: `advanceNext` and `getRow`.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing UT.

Closes #26229 from xuanyuanking/SPARK-21492-follow-up.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Oct 24, 2019
…database style iterator

### What changes were proposed in this pull request?
Reimplement the iterator in UnsafeExternalRowSorter in database style. This can be done by reusing the `RowIterator` in our code base.

### Why are the changes needed?
During the job in #26164, after involving a var `isReleased` in `hasNext`, there's possible that `isReleased` is false when calling `hasNext`, but it becomes true before calling `next`. A safer way is using database-style iterator: `advanceNext` and `getRow`.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing UT.

Closes #26229 from xuanyuanking/SPARK-21492-follow-up.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 9e77d48)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
wangqia0309 pushed a commit to bigo-sg/spark that referenced this pull request Oct 31, 2019
…database style iterator

### What changes were proposed in this pull request?
Reimplement the iterator in UnsafeExternalRowSorter in database style. This can be done by reusing the `RowIterator` in our code base.

### Why are the changes needed?
During the job in apache#26164, after involving a var `isReleased` in `hasNext`, there's possible that `isReleased` is false when calling `hasNext`, but it becomes true before calling `next`. A safer way is using database-style iterator: `advanceNext` and `getRow`.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing UT.

Closes apache#26229 from xuanyuanking/SPARK-21492-follow-up.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 9e77d48)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants