Skip to content

Conversation

@cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Aug 23, 2024

Original author: @amuraru

What changes were proposed in this pull request?

This PR aims to support add memory based thresholds for shuffle spill.

Introduce configuration

  • spark.shuffle.spill.maxRecordsSizeForSpillThreshold
  • spark.sql.windowExec.buffer.spill.size.threshold
  • spark.sql.sessionWindow.buffer.spill.size.threshold
  • spark.sql.sortMergeJoinExec.buffer.spill.size.threshold
  • spark.sql.cartesianProductExec.buffer.spill.size.threshold

Why are the changes needed?

#24618

We can only determine the number of spills by configuring spark.shuffle.spill.numElementsForceSpillThreshold. In some scenarios, the size of a row may be very large in the memory.

Does this PR introduce any user-facing change?

No

How was this patch tested?

GA

Verified in the production environment, the task time is shortened, the number of spill disks is reduced, there is a better chance to compress the shuffle data, and the size of the spill to disk is also significantly reduced.

Current

image
24/08/19 07:02:54,947 [Executor task launch worker for task 0.0 in stage 53.0 (TID 1393)] INFO ShuffleExternalSorter: Thread 126 spilling sort data of 62.0 MiB to disk (11490  times so far)
24/08/19 07:02:55,029 [Executor task launch worker for task 0.0 in stage 53.0 (TID 1393)] INFO ShuffleExternalSorter: Thread 126 spilling sort data of 62.0 MiB to disk (11491  times so far)
24/08/19 07:02:55,093 [Executor task launch worker for task 0.0 in stage 53.0 (TID 1393)] INFO ShuffleExternalSorter: Thread 126 spilling sort data of 62.0 MiB to disk (11492  times so far)
24/08/19 07:08:59,894 [Executor task launch worker for task 0.0 in stage 53.0 (TID 1393)] INFO Executor: Finished task 0.0 in stage 53.0 (TID 1393). 7409 bytes result sent to driver

PR
image

Was this patch authored or co-authored using generative AI tooling?

No

@HyukjinKwon
Copy link
Member

Let's probably file a new JIRA

@cxzl25 cxzl25 changed the title [SPARK-27734][CORE][SQL] Add memory based thresholds for shuffle spill [SPARK-49386][SPARK-27734][CORE][SQL] Add memory based thresholds for shuffle spill Aug 26, 2024
@dongjoon-hyun
Copy link
Member

Gentle ping, @cxzl25 and @mridulm .

Although we have enough time until Feature Freeze, I'm wondering if we can deliver this via Apache Spark 4.0.0-preview2 RC1 (next Monday). WDYT?

@mridulm
Copy link
Contributor

mridulm commented Sep 12, 2024

I am a bit swamped unfortunately, and I dont think I will be able to ensure this gets merged before next monday @dongjoon-hyun - sorry about that :-(

@cxzl25, will try to get around to reviewing this soon - apologies for the delay

@mridulm
Copy link
Contributor

mridulm commented Sep 12, 2024

+CC @Ngone51 as well.

@dongjoon-hyun
Copy link
Member

Thank you for letting me know, @mridulm ~ No problem at all.

@pan3793
Copy link
Member

pan3793 commented Apr 18, 2025

Kindly ping @mridulm, do you have a chance to take another look? I also found this PR is helpful for stability for jobs that spill huge data.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few comments, mostly looks good to me.
Thanks for working on this @cxzl25, and apologies for the delay in getting to this !

+CC @HyukjinKwon, @cloud-fan as well for review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By moving _elementsRead > numElementsForceSpillThreshold here, we would actually reduce some unnecessary allocations .... nice !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config name is a bit confusing.
spark.sql.windowExec.buffer.spill.threshold vs spark.sql.windowExec.buffer.spill.size.threshold.

Same for the others introduced.

I will let @HyukjinKwon or @cloud-fan comment better though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not super used to this area. I would rarther follow the suggestions from you / others.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HyukjinKwon !
+CC @dongjoon-hyun as well.

@mridulm
Copy link
Contributor

mridulm commented May 3, 2025

I am planning to merge this next week if there are no concerns @cloud-fan , @dongjoon-hyun.
It has been open for quite a while, and is a very helpful fix to mitigate memory issues.

I am not super keen on the naming of some of the sql configs, would your thoughts on that (as well as rest of the PR).

Also, +CC @attilapiros for feedback as well.

@rahil-c
Copy link

rahil-c commented Jun 24, 2025

@mridulm @cxzl25 @attilapiros @HyukjinKwon @pan3793

Hi all was just curious if there was any issues regarding this pr or if it will be merged in OSS Spark sometime soon? Thanks again for making this change!

@mridulm
Copy link
Contributor

mridulm commented Jun 25, 2025

I did not merge it given @attilapiros was actively reviewing it.
Are there any other concerns/comments on this Attila ?

@attilapiros
Copy link
Contributor

checking

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after the code duplicate is resolved.

amuraru and others added 3 commits June 26, 2025 15:40
When running large shuffles (700TB input data, 200k map tasks, 50k reducers on a 300 nodes cluster) the job is regularly OOMing in map and reduce phase.

IIUC ShuffleExternalSorter (map side) and ExternalAppendOnlyMap and ExternalSorter (reduce side) are trying to max out the available execution memory. This in turn doesn't play nice with the Garbage Collector and executors are failing with OutOfMemoryError when the memory allocation from these in-memory structure is maxing out the available heap size (in our case we are running with 9 cores/executor, 32G per executor)

To mitigate this, one can set spark.shuffle.spill.numElementsForceSpillThreshold to force the spill on disk. While this config works, it is not flexible enough as it's expressed in number of elements, and in our case we run multiple shuffles in a single job and element size is different from one stage to another.

This patch extends the spill threshold behaviour and adds two new parameters to control the spill based on memory usage:

- spark.shuffle.spill.map.maxRecordsSizeForSpillThreshold
- spark.shuffle.spill.reduce.maxRecordsSizeForSpillThreshold
@mridulm
Copy link
Contributor

mridulm commented Jul 3, 2025

If the current changes look good, can you merge it pls @attilapiros ?
I am travelling and dont have access to my desktop :)

@rahil-c
Copy link

rahil-c commented Jul 3, 2025

Thank you! @mridulm @attilapiros @cxzl25 , looking forward to this change in coming spark release.

@attilapiros attilapiros changed the title [SPARK-49386][SPARK-27734][CORE][SQL] Add memory based thresholds for shuffle spill [SPARK-49386][CORE][SQL] Add memory based thresholds for shuffle spill Jul 4, 2025
@attilapiros
Copy link
Contributor

Merged to master.

@attilapiros
Copy link
Contributor

Let's probably file a new JIRA

@HyukjinKwon Can I close the old jira (https://issues.apache.org/jira/browse/SPARK-27734) as a duplicate or what was your plan when you asked for a new ticket?

initialSize,
pageSizeBytes,
numRowsSpillThreshold,
maxSizeSpillThreshold,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at this def add method, it trigger spilling only if the num rows exceeds numRowsInMemoryBufferThreshold. IIUC we are not using memory-based threshold here, as we keep appending data to a memory buffer based on the num rows threshold.

Ngone51 pushed a commit that referenced this pull request Sep 4, 2025
…memory based spill threshold

### What changes were proposed in this pull request?

This is a followup of #47856 . It makes the memory tracking more accurate in several places:
1. In `ShuffleExternalSorter`/`UnsafeExternalSorter`, the memory is used by both the sorter itself, and its underlying in-memort sorter (for sorting shuffle partition ids). We need to add them up to calcuate the current memory usage.
2. In `ExternalAppendOnlyUnsafeRowArray`, the records are inserted to an in-memory buffer first. If the buffer gets too large (currently based on num records), we switch to `UnsafeExternalSorter`. The in-memory buffer also needs a memory based threshold

### Why are the changes needed?

More accurate memory tracking results to better spill decisions

### Does this PR introduce _any_ user-facing change?

No, the feature is not released yet.

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #52190 from cloud-fan/spill.

Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Yi Wu <yi.wu@databricks.com>
dongjoon-hyun added a commit that referenced this pull request Oct 16, 2025
### What changes were proposed in this pull request?

This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation.

### Why are the changes needed?

To help the users use new features easily.

- #47856
- #51130
- #51163
- #51604
- #51630
- #51708
- #51885
- #52091
- #52382

### Does this PR introduce _any_ user-facing change?

No behavior change because this is a documentation update.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52626 from dongjoon-hyun/SPARK-53926.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…memory based spill threshold

### What changes were proposed in this pull request?

This is a followup of apache#47856 . It makes the memory tracking more accurate in several places:
1. In `ShuffleExternalSorter`/`UnsafeExternalSorter`, the memory is used by both the sorter itself, and its underlying in-memort sorter (for sorting shuffle partition ids). We need to add them up to calcuate the current memory usage.
2. In `ExternalAppendOnlyUnsafeRowArray`, the records are inserted to an in-memory buffer first. If the buffer gets too large (currently based on num records), we switch to `UnsafeExternalSorter`. The in-memory buffer also needs a memory based threshold

### Why are the changes needed?

More accurate memory tracking results to better spill decisions

### Does this PR introduce _any_ user-facing change?

No, the feature is not released yet.

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#52190 from cloud-fan/spill.

Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Yi Wu <yi.wu@databricks.com>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation.

### Why are the changes needed?

To help the users use new features easily.

- apache#47856
- apache#51130
- apache#51163
- apache#51604
- apache#51630
- apache#51708
- apache#51885
- apache#52091
- apache#52382

### Does this PR introduce _any_ user-facing change?

No behavior change because this is a documentation update.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52626 from dongjoon-hyun/SPARK-53926.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants