Skip to content

Commit

Permalink
[CELEBORN-1047] Remove conf `celeborn.worker.sortPartition.eagerlyRem…
Browse files Browse the repository at this point in the history
…oveOriginalFiles.enabled`

### What changes were proposed in this pull request?

As title

### Why are the changes needed?

The config key `celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled` has become unnecessary as a result of apache#1932

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass GA

Closes apache#1999 from cfmcgrady/celeborn-1047.

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
cfmcgrady authored and waitinfuture committed Oct 18, 2023
1 parent 69defca commit 8bf7e52
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
def partitionSorterEagerlyRemoveOriginalFilesEnabled: Boolean =
get(PARTITION_SORTER_EAGERLY_REMOVE_ORIGINAL_FILES_ENABLED)
def partitionSorterSortPartitionTimeout: Long = get(PARTITION_SORTER_SORT_TIMEOUT)
def partitionSorterReservedMemoryPerPartition: Long =
get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
Expand Down Expand Up @@ -2292,23 +2290,6 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")

val PARTITION_SORTER_EAGERLY_REMOVE_ORIGINAL_FILES_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled")
.categories("worker")
.doc("When set to true, the PartitionSorter immediately removes the original file once " +
"its partition has been successfully sorted. It is important to note that this behavior " +
"may result in a potential issue with the ReusedExchange operation when it triggers both " +
"non-range and range fetch requests simultaneously. When set to false, the " +
"PartitionSorter will retain the original unsorted file. However, it's essential to be " +
"aware that enabling this option may lead to an increase in storage space usage during " +
"the range fetch phase, as both the original and sorted files will be retained until the " +
"shuffle is finished. Note that the default value is configured as 'false' as a " +
"temporary workaround for CELEBORN-980. see CELEBORN-980 for more details.")
.version("0.3.1")
.internal
.booleanConf
.createWithDefault(false)

val PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.sortPartition.timeout")
.withAlternative("celeborn.worker.partitionSorter.sort.timeout")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {

private final AtomicInteger sortedFileCount = new AtomicInteger();
private final AtomicLong sortedFilesSize = new AtomicLong();
protected final boolean eagerlyRemoveOriginalFilesEnabled;
protected final long sortTimeout;
protected final long shuffleChunkSize;
protected final long reservedMemoryPerPartition;
Expand All @@ -97,8 +96,6 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {

public PartitionFilesSorter(
MemoryManager memoryManager, CelebornConf conf, AbstractSource source) {
this.eagerlyRemoveOriginalFilesEnabled =
conf.partitionSorterEagerlyRemoveOriginalFilesEnabled();
this.sortTimeout = conf.partitionSorterSortPartitionTimeout();
this.shuffleChunkSize = conf.shuffleChunkSize();
this.reservedMemoryPerPartition = conf.partitionSorterReservedMemoryPerPartition();
Expand Down

0 comments on commit 8bf7e52

Please sign in to comment.