-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-21492][SQL] Fix memory leak in SortMergeJoin #26164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #112276 has finished for PR 26164 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
@@ -161,6 +162,10 @@ case class SortMergeJoinExec( | |||
sqlContext.conf.sortMergeJoinExecBufferInMemoryThreshold | |||
} | |||
|
|||
private def needEagerCleanup: Boolean = { | |||
sqlContext.conf.getConf(SORT_MERGE_JOIN_EXEC_EAGER_CLEANUP_RESOURCES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this only controls cleanup behavior in codege path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, thanks for reminding, done in 631f3cb
f9567d5
to
631f3cb
Compare
Test build #112308 has finished for PR 26164 at commit
|
Test build #112328 has finished for PR 26164 at commit
|
Test build #112336 has finished for PR 26164 at commit
|
retest this please. |
Test build #112353 has finished for PR 26164 at commit
|
Test build #112362 has finished for PR 26164 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
Outdated
Show resolved
Hide resolved
private def attachCleanupResourceChecker(plan: SparkPlan): Unit = { | ||
// SPARK-21492: Check cleanupResources are finally triggered in SortExec node for every | ||
// test case | ||
val sorts = new ArrayBuffer[SortExec]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: now we don't need this array
plan.foreachUp {
case s: SortExec =>
// add spy
case _ =>
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, just do the simplify at the same time :) 6d6dd5a
Test build #112370 has finished for PR 26164 at commit
|
Test build #112372 has finished for PR 26164 at commit
|
retest this please |
Test build #112389 has finished for PR 26164 at commit
|
retest this please |
Test build #112431 has finished for PR 26164 at commit
|
thanks, merging to master! |
@xuanyuanking can you send a PR for 2.4 backport? thanks! |
Sure, backport to 2.4 in #26210. |
…database style iterator ### What changes were proposed in this pull request? Reimplement the iterator in UnsafeExternalRowSorter in database style. This can be done by reusing the `RowIterator` in our code base. ### Why are the changes needed? During the job in #26164, after involving a var `isReleased` in `hasNext`, there's possible that `isReleased` is false when calling `hasNext`, but it becomes true before calling `next`. A safer way is using database-style iterator: `advanceNext` and `getRow`. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT. Closes #26229 from xuanyuanking/SPARK-21492-follow-up. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…database style iterator ### What changes were proposed in this pull request? Reimplement the iterator in UnsafeExternalRowSorter in database style. This can be done by reusing the `RowIterator` in our code base. ### Why are the changes needed? During the job in #26164, after involving a var `isReleased` in `hasNext`, there's possible that `isReleased` is false when calling `hasNext`, but it becomes true before calling `next`. A safer way is using database-style iterator: `advanceNext` and `getRow`. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT. Closes #26229 from xuanyuanking/SPARK-21492-follow-up. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 9e77d48) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…database style iterator ### What changes were proposed in this pull request? Reimplement the iterator in UnsafeExternalRowSorter in database style. This can be done by reusing the `RowIterator` in our code base. ### Why are the changes needed? During the job in apache#26164, after involving a var `isReleased` in `hasNext`, there's possible that `isReleased` is false when calling `hasNext`, but it becomes true before calling `next`. A safer way is using database-style iterator: `advanceNext` and `getRow`. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT. Closes apache#26229 from xuanyuanking/SPARK-21492-follow-up. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 9e77d48) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
cleanupResources
in SparkPlan, which default call children'scleanupResources
function, the operator which need a resource cleanup should rewrite this with the self cleanup and also callsuper.cleanupResources
, like SortExec in this PR.cleanupResources
to do the cleanup job for all its upstream(children) operator.Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.
Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks @taosaildrone for providing this test here) will pass with this PR.