Skip to content

Commit

Permalink
[apache#1678] fix(server): disk size leak on removing resources by Ap…
Browse files Browse the repository at this point in the history
…pPurgeEvent
  • Loading branch information
zuston committed May 8, 2024
1 parent 917456c commit 4d3b2c1
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ public void markHugePartition(int shuffleId, int partitionId) {
}
}

public Set<Integer> getShuffleIds() {
return partitionDataSizes.keySet();
}

@Override
public String toString() {
return "ShuffleTaskInfo{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.uniffle.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -771,7 +772,9 @@ public void removeResources(String appId, boolean checkAppExpired) {
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
storageManager.removeResources(new AppPurgeEvent(appId, shuffleTaskInfo.getUser()));
storageManager.removeResources(
new AppPurgeEvent(
appId, shuffleTaskInfo.getUser(), new ArrayList<>(shuffleTaskInfo.getShuffleIds())));
if (shuffleTaskInfo.hasHugePartition()) {
ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
ShuffleServerMetrics.gaugeHugePartitionNum.dec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,67 @@ public void afterEach() throws Exception {
ShuffleServerMetrics.clear();
}

/** Test the shuffleMeta's diskSize when app is removed. */
@Test
public void appPurgeWithLocalfileTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), "LOCALFILE");
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.setString(
ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath());

shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "removeShuffleDataWithLocalfileTest";

int shuffleNum = 4;
for (int i = 0; i < shuffleNum; i++) {
shuffleTaskManager.registerShuffle(
appId,
i,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);

ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0.getBlockList());
}

assertEquals(1, shuffleTaskManager.getAppIds().size());
for (int i = 0; i < shuffleNum; i++) {
shuffleTaskManager.commitShuffle(appId, i);
}

shuffleTaskManager.removeResources(appId, false);
for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) {
String appPath = path + "/" + appId;
assertFalse(new File(appPath).exists());
}

// once the app is removed. the disk size should be 0
LocalStorageManager localStorageManager =
(LocalStorageManager) shuffleServer.getStorageManager();
for (LocalStorage localStorage : localStorageManager.getStorages()) {
assertEquals(0, localStorage.getMetaData().getDiskSize().get());
}
}

@Test
public void hugePartitionMemoryUsageLimitTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
Expand Down

0 comments on commit 4d3b2c1

Please sign in to comment.