-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-49386][CORE][SQL] Add memory based thresholds for shuffle spill #47856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Let's probably file a new JIRA |
|
I am a bit swamped unfortunately, and I dont think I will be able to ensure this gets merged before next monday @dongjoon-hyun - sorry about that :-( @cxzl25, will try to get around to reviewing this soon - apologies for the delay |
|
+CC @Ngone51 as well. |
|
Thank you for letting me know, @mridulm ~ No problem at all. |
|
Kindly ping @mridulm, do you have a chance to take another look? I also found this PR is helpful for stability for jobs that spill huge data. |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few comments, mostly looks good to me.
Thanks for working on this @cxzl25, and apologies for the delay in getting to this !
+CC @HyukjinKwon, @cloud-fan as well for review.
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By moving _elementsRead > numElementsForceSpillThreshold here, we would actually reduce some unnecessary allocations .... nice !
core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The config name is a bit confusing.
spark.sql.windowExec.buffer.spill.threshold vs spark.sql.windowExec.buffer.spill.size.threshold.
Same for the others introduced.
I will let @HyukjinKwon or @cloud-fan comment better though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not super used to this area. I would rarther follow the suggestions from you / others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HyukjinKwon !
+CC @dongjoon-hyun as well.
|
I am planning to merge this next week if there are no concerns @cloud-fan , @dongjoon-hyun. I am not super keen on the naming of some of the sql configs, would your thoughts on that (as well as rest of the PR). Also, +CC @attilapiros for feedback as well. |
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
Outdated
Show resolved
Hide resolved
|
@mridulm @cxzl25 @attilapiros @HyukjinKwon @pan3793 Hi all was just curious if there was any issues regarding this pr or if it will be merged in OSS Spark sometime soon? Thanks again for making this change! |
|
I did not merge it given @attilapiros was actively reviewing it. |
|
checking |
attilapiros
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after the code duplicate is resolved.
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactory.scala
Outdated
Show resolved
Hide resolved
When running large shuffles (700TB input data, 200k map tasks, 50k reducers on a 300 nodes cluster) the job is regularly OOMing in map and reduce phase. IIUC ShuffleExternalSorter (map side) and ExternalAppendOnlyMap and ExternalSorter (reduce side) are trying to max out the available execution memory. This in turn doesn't play nice with the Garbage Collector and executors are failing with OutOfMemoryError when the memory allocation from these in-memory structure is maxing out the available heap size (in our case we are running with 9 cores/executor, 32G per executor) To mitigate this, one can set spark.shuffle.spill.numElementsForceSpillThreshold to force the spill on disk. While this config works, it is not flexible enough as it's expressed in number of elements, and in our case we run multiple shuffles in a single job and element size is different from one stage to another. This patch extends the spill threshold behaviour and adds two new parameters to control the spill based on memory usage: - spark.shuffle.spill.map.maxRecordsSizeForSpillThreshold - spark.shuffle.spill.reduce.maxRecordsSizeForSpillThreshold
|
If the current changes look good, can you merge it pls @attilapiros ? |
|
Thank you! @mridulm @attilapiros @cxzl25 , looking forward to this change in coming spark release. |
|
Merged to master. |
@HyukjinKwon Can I close the old jira (https://issues.apache.org/jira/browse/SPARK-27734) as a duplicate or what was your plan when you asked for a new ticket? |
| initialSize, | ||
| pageSizeBytes, | ||
| numRowsSpillThreshold, | ||
| maxSizeSpillThreshold, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking at this def add method, it trigger spilling only if the num rows exceeds numRowsInMemoryBufferThreshold. IIUC we are not using memory-based threshold here, as we keep appending data to a memory buffer based on the num rows threshold.
…memory based spill threshold ### What changes were proposed in this pull request? This is a followup of #47856 . It makes the memory tracking more accurate in several places: 1. In `ShuffleExternalSorter`/`UnsafeExternalSorter`, the memory is used by both the sorter itself, and its underlying in-memort sorter (for sorting shuffle partition ids). We need to add them up to calcuate the current memory usage. 2. In `ExternalAppendOnlyUnsafeRowArray`, the records are inserted to an in-memory buffer first. If the buffer gets too large (currently based on num records), we switch to `UnsafeExternalSorter`. The in-memory buffer also needs a memory based threshold ### Why are the changes needed? More accurate memory tracking results to better spill decisions ### Does this PR introduce _any_ user-facing change? No, the feature is not released yet. ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #52190 from cloud-fan/spill. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Yi Wu <yi.wu@databricks.com>
### What changes were proposed in this pull request? This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation. ### Why are the changes needed? To help the users use new features easily. - #47856 - #51130 - #51163 - #51604 - #51630 - #51708 - #51885 - #52091 - #52382 ### Does this PR introduce _any_ user-facing change? No behavior change because this is a documentation update. ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52626 from dongjoon-hyun/SPARK-53926. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…memory based spill threshold ### What changes were proposed in this pull request? This is a followup of apache#47856 . It makes the memory tracking more accurate in several places: 1. In `ShuffleExternalSorter`/`UnsafeExternalSorter`, the memory is used by both the sorter itself, and its underlying in-memort sorter (for sorting shuffle partition ids). We need to add them up to calcuate the current memory usage. 2. In `ExternalAppendOnlyUnsafeRowArray`, the records are inserted to an in-memory buffer first. If the buffer gets too large (currently based on num records), we switch to `UnsafeExternalSorter`. The in-memory buffer also needs a memory based threshold ### Why are the changes needed? More accurate memory tracking results to better spill decisions ### Does this PR introduce _any_ user-facing change? No, the feature is not released yet. ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#52190 from cloud-fan/spill. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Yi Wu <yi.wu@databricks.com>
### What changes were proposed in this pull request? This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation. ### Why are the changes needed? To help the users use new features easily. - apache#47856 - apache#51130 - apache#51163 - apache#51604 - apache#51630 - apache#51708 - apache#51885 - apache#52091 - apache#52382 ### Does this PR introduce _any_ user-facing change? No behavior change because this is a documentation update. ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52626 from dongjoon-hyun/SPARK-53926. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Original author: @amuraru
What changes were proposed in this pull request?
This PR aims to support add memory based thresholds for shuffle spill.
Introduce configuration
Why are the changes needed?
#24618
We can only determine the number of spills by configuring
spark.shuffle.spill.numElementsForceSpillThreshold. In some scenarios, the size of a row may be very large in the memory.Does this PR introduce any user-facing change?
No
How was this patch tested?
GA
Verified in the production environment, the task time is shortened, the number of spill disks is reduced, there is a better chance to compress the shuffle data, and the size of the spill to disk is also significantly reduced.
Current
PR

Was this patch authored or co-authored using generative AI tooling?
No