Skip to content

Commit

Permalink
[#1678] fix(server): disk size leak on removing resources by AppPurge…
Browse files Browse the repository at this point in the history
…Event (#1679) (#1689)

### What changes were proposed in this pull request?

Descrease the disk size  that calculated by local storage self on removing resources with AppPurgeEvent

### Why are the changes needed?

Fix: #1678

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests
  • Loading branch information
zuston authored May 11, 2024
1 parent ee3633b commit cf84420
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ public void markHugePartition(int shuffleId, int partitionId) {
}
}

public Set<Integer> getShuffleIds() {
return partitionDataSizes.keySet();
}

@Override
public String toString() {
return "ShuffleTaskInfo{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.uniffle.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -771,7 +772,9 @@ public void removeResources(String appId, boolean checkAppExpired) {
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
storageManager.removeResources(new AppPurgeEvent(appId, shuffleTaskInfo.getUser()));
storageManager.removeResources(
new AppPurgeEvent(
appId, shuffleTaskInfo.getUser(), new ArrayList<>(shuffleTaskInfo.getShuffleIds())));
if (shuffleTaskInfo.hasHugePartition()) {
ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
ShuffleServerMetrics.gaugeHugePartitionNum.dec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,71 @@ public void afterEach() throws Exception {
ShuffleServerMetrics.clear();
}

private ShuffleServerConf constructServerConfWithLocalfile() {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.setString(
ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath());
return conf;
}

/** Test the shuffleMeta's diskSize when app is removed. */
@Test
public void appPurgeWithLocalfileTest() throws Exception {
ShuffleServerConf conf = constructServerConfWithLocalfile();
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "removeShuffleDataWithLocalfileTest";

int shuffleNum = 4;
for (int i = 0; i < shuffleNum; i++) {
shuffleTaskManager.registerShuffle(
appId,
i,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);

ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0.getBlockList());
}

assertEquals(1, shuffleTaskManager.getAppIds().size());
for (int i = 0; i < shuffleNum; i++) {
shuffleTaskManager.commitShuffle(appId, i);
}

shuffleTaskManager.removeResources(appId, false);
for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) {
String appPath = path + "/" + appId;
assertFalse(new File(appPath).exists());
}

// once the app is removed. the disk size should be 0
LocalStorageManager localStorageManager =
(LocalStorageManager) shuffleServer.getStorageManager();
for (LocalStorage localStorage : localStorageManager.getStorages()) {
assertEquals(0, localStorage.getMetaData().getDiskSize().get());
}
}

@Test
public void hugePartitionMemoryUsageLimitTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
Expand Down Expand Up @@ -479,25 +544,7 @@ public void removeShuffleDataWithHdfsTest() throws Exception {

@Test
public void removeShuffleDataWithLocalfileTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), "LOCALFILE");
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.setString(
ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath());

ShuffleServerConf conf = constructServerConfWithLocalfile();
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

Expand Down

0 comments on commit cf84420

Please sign in to comment.