Skip to content

Commit

Permalink
[CELEBORN-988] Add config option to control original unsorted file de…
Browse files Browse the repository at this point in the history
…letion in `PartitionFilesSorter`

### What changes were proposed in this pull request?

This PR adds a new configuration option, `celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled`, allowing users to control whether the `PartitionFilesSorter` deletes the original unsorted file.

### Why are the changes needed?

apache#1907 (comment)

### Does this PR introduce _any_ user-facing change?

Users have the option to prevent the `PartitionSorter` from deleting the original unsorted file by configuring `celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled = false`.

### How was this patch tested?

Pass GA

Closes apache#1922 from cfmcgrady/make-delete-configurable.

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
cfmcgrady authored and waitinfuture committed Sep 19, 2023
1 parent beed2a8 commit 1e49ff7
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
def partitionSorterLazyRemovalOfOriginalFilesEnabled: Boolean =
get(PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED)
def partitionSorterSortPartitionTimeout: Long = get(PARTITION_SORTER_SORT_TIMEOUT)
def partitionSorterReservedMemoryPerPartition: Long =
get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
Expand Down Expand Up @@ -2209,6 +2211,21 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")

val PARTITION_SORTER_LAZY_REMOVAL_OF_ORIGINAL_FILES_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled")
.categories("worker")
.doc("When set to false, the PartitionSorter immediately removes the original file once " +
"its partition has been successfully sorted. It is important to note that this behavior " +
"may result in a potential issue with the ReusedExchange operation when it triggers both " +
"non-range and range fetch requests simultaneously. see CELEBORN-980 for more details." +
"When set to true, the PartitionSorter will retain the original unsorted file. However, " +
"it's essential to be aware that enabling this option may lead to an increase in storage " +
"space usage during the range fetch phase, as both the original and sorted files will be " +
"retained until the shuffle is finished.")
.version("0.3.2")
.booleanConf
.createWithDefault(true)

val PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.sortPartition.timeout")
.withAlternative("celeborn.worker.partitionSorter.sort.timeout")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ license: |
| celeborn.worker.shuffle.partitionSplit.enabled | true | enable the partition split on worker side | 0.3.0 |
| celeborn.worker.shuffle.partitionSplit.max | 2g | Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit. | 0.3.0 |
| celeborn.worker.shuffle.partitionSplit.min | 1m | Min size for a partition to split | 0.3.0 |
| celeborn.worker.sortPartition.lazyRemovalOfOriginalFiles.enabled | true | When set to false, the PartitionSorter immediately removes the original file once its partition has been successfully sorted. It is important to note that this behavior may result in a potential issue with the ReusedExchange operation when it triggers both non-range and range fetch requests simultaneously. see CELEBORN-980 for more details.When set to true, the PartitionSorter will retain the original unsorted file. However, it's essential to be aware that enabling this option may lead to an increase in storage space usage during the range fetch phase, as both the original and sorted files will be retained until the shuffle is finished. | 0.3.2 |
| celeborn.worker.sortPartition.reservedMemoryPerPartition | 1mb | Reserved memory when sorting a shuffle file off-heap. | 0.3.0 |
| celeborn.worker.sortPartition.threads | &lt;undefined&gt; | PartitionSorter's thread counts. It's recommended to set at least `64` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
| celeborn.worker.sortPartition.timeout | 220s | Timeout for a shuffle file to sort. | 0.3.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {

private final AtomicInteger sortedFileCount = new AtomicInteger();
private final AtomicLong sortedFilesSize = new AtomicLong();
protected final boolean lazyRemovalOfOriginalFilesEnabled;
protected final long sortTimeout;
protected final long shuffleChunkSize;
protected final long reservedMemoryPerPartition;
Expand All @@ -92,6 +93,8 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {

public PartitionFilesSorter(
MemoryManager memoryManager, CelebornConf conf, AbstractSource source) {
this.lazyRemovalOfOriginalFilesEnabled =
conf.partitionSorterLazyRemovalOfOriginalFilesEnabled();
this.sortTimeout = conf.partitionSorterSortPartitionTimeout();
this.shuffleChunkSize = conf.shuffleChunkSize();
this.reservedMemoryPerPartition = conf.partitionSorterReservedMemoryPerPartition();
Expand Down Expand Up @@ -593,7 +596,9 @@ public void sort() throws InterruptedException {

writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
updateSortedShuffleFiles(shuffleKey, fileId, originFileLen);
deleteOriginFiles();
if (!lazyRemovalOfOriginalFilesEnabled) {
deleteOriginFiles();
}
logger.debug("sort complete for {} {}", shuffleKey, originFilePath);
} catch (Exception e) {
logger.error(
Expand Down

0 comments on commit 1e49ff7

Please sign in to comment.