Skip to content

Commit

Permalink
[#1209] improvement(server): Speed up cleanupStorageSelectionCache me…
Browse files Browse the repository at this point in the history
…thod in LocalStorageManager. (#1210)

### What changes were proposed in this pull request?

Optimize cleanupStorageSelectionCache method in LocalStorageManager.

### Why are the changes needed?

Fix: #1209

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
Existing UTs.
  • Loading branch information
zhuyaogai authored Sep 25, 2023
1 parent a1d6d50 commit b4adaa5
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -50,7 +52,6 @@
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.Checker;
Expand Down Expand Up @@ -81,7 +82,7 @@ public class LocalStorageManager extends SingleStorageManager {
private final List<String> storageBasePaths;
private final LocalStorageChecker checker;

private final Map<String, LocalStorage> partitionsOfStorage;
private final ConcurrentSkipListMap<String, LocalStorage> sortedPartitionsOfStorageMap;
private final List<StorageMediaProvider> typeProviders = Lists.newArrayList();

@VisibleForTesting
Expand All @@ -91,7 +92,7 @@ public class LocalStorageManager extends SingleStorageManager {
if (CollectionUtils.isEmpty(storageBasePaths)) {
throw new IllegalArgumentException("Base path dirs must not be empty");
}
this.partitionsOfStorage = JavaUtils.newConcurrentMap();
this.sortedPartitionsOfStorageMap = new ConcurrentSkipListMap<>();
long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY);
double ratio = conf.getDouble(ShuffleServerConf.DISK_CAPACITY_RATIO);
double highWaterMarkOfWrite = conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE);
Expand Down Expand Up @@ -182,7 +183,7 @@ public Storage selectStorage(ShuffleDataFlushEvent event) {
int partitionId = event.getStartPartition();

LocalStorage storage =
partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId, partitionId));
sortedPartitionsOfStorageMap.get(UnionKey.buildKey(appId, shuffleId, partitionId));
if (storage != null) {
if (storage.isCorrupted()) {
if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
Expand Down Expand Up @@ -210,7 +211,7 @@ public Storage selectStorage(ShuffleDataFlushEvent event) {
final LocalStorage selectedStorage =
candidates.get(
ShuffleStorageUtils.getStorageIndex(candidates.size(), appId, shuffleId, partitionId));
return partitionsOfStorage.compute(
return sortedPartitionsOfStorageMap.compute(
UnionKey.buildKey(appId, shuffleId, partitionId),
(key, localStorage) -> {
// If this is the first time to select storage or existing storage is corrupted,
Expand All @@ -231,7 +232,7 @@ public Storage selectStorage(ShuffleDataReadEvent event) {
int shuffleId = event.getShuffleId();
int partitionId = event.getStartPartition();

return partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId, partitionId));
return sortedPartitionsOfStorageMap.get(UnionKey.buildKey(appId, shuffleId, partitionId));
}

@Override
Expand Down Expand Up @@ -301,24 +302,39 @@ public void removeResources(PurgeEvent event) {

private void cleanupStorageSelectionCache(PurgeEvent event) {
Function<String, Boolean> deleteConditionFunc = null;
String prefixKey = null;
if (event instanceof AppPurgeEvent) {
prefixKey = UnionKey.buildKey(event.getAppId());
deleteConditionFunc =
partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId());
} else if (event instanceof ShufflePurgeEvent) {
int shuffleId = event.getShuffleIds().get(0);
prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId);
deleteConditionFunc =
partitionUnionKey ->
UnionKey.startsWith(partitionUnionKey, event.getAppId(), event.getShuffleIds());
partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId(), shuffleId);
}
if (prefixKey == null) {
throw new RssException("Prefix key is null when handles event: " + event);
}
long startTime = System.currentTimeMillis();
deleteElement(partitionsOfStorage, deleteConditionFunc);
deleteElement(sortedPartitionsOfStorageMap.tailMap(prefixKey), deleteConditionFunc);
LOG.info(
"Cleaning the storage selection cache costs: {}(ms) for event: {}",
System.currentTimeMillis() - startTime,
event);
}

private <K, V> void deleteElement(Map<K, V> map, Function<K, Boolean> deleteConditionFunc) {
map.entrySet().removeIf(entry -> deleteConditionFunc.apply(entry.getKey()));
private <K, V> void deleteElement(
Map<K, V> sortedPartitionsOfStorageMap, Function<K, Boolean> deleteConditionFunc) {
Iterator<Map.Entry<K, V>> iterator = sortedPartitionsOfStorageMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<K, V> entry = iterator.next();
if (deleteConditionFunc.apply(entry.getKey())) {
iterator.remove();
} else {
break;
}
}
}

@Override
Expand Down Expand Up @@ -379,4 +395,10 @@ public Map<String, StorageInfo> getStorageInfo() {
public List<LocalStorage> getStorages() {
return localStorages;
}

// Only for test.
@VisibleForTesting
public Map<String, LocalStorage> getSortedPartitionsOfStorageMap() {
return sortedPartitionsOfStorageMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.server.storage.HadoopStorageManager;
import org.apache.uniffle.server.storage.HybridStorageManager;
import org.apache.uniffle.server.storage.LocalStorageManager;
Expand Down Expand Up @@ -467,30 +468,43 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception {
manager.addToFlushQueue(event1);
ShuffleDataFlushEvent event2 = createShuffleDataFlushEvent(appId2, 1, 0, 1, null);
manager.addToFlushQueue(event2);
ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId2, 2, 0, 1, null);
manager.addToFlushQueue(event3);
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
waitForFlush(manager, appId2, 2, 5);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality());
assertEquals(2, storage.getHandlerSize());
File file = new File(tempDir, appId1);
assertTrue(file.exists());
storageManager.removeResources(
new ShufflePurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1)));
ShuffleDataFlushEvent event4 = createShuffleDataFlushEvent(appId1, 1, 0, 1, () -> false);
manager.addToFlushQueue(event4);
Thread.sleep(1000);
storageManager.removeResources(
new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1)));
manager.removeResources(appId1);
assertFalse(file.exists());
ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId1, 1, 0, 1, () -> false);
manager.addToFlushQueue(event3);
Thread.sleep(1000);
assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality());
assertEquals(1, storage.getHandlerSize());
manager.removeResources(appId2);
storageManager.removeResources(
new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1)));
storageManager.removeResources(
new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(2)));
storageManager.removeResources(
new AppPurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1)));
assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(0, storage.getHandlerSize());
assertEquals(
0, ((LocalStorageManager) storageManager).getSortedPartitionsOfStorageMap().size());
}

private void waitForMetrics(Gauge.Child gauge, double expected, double delta) throws Exception {
Expand Down

0 comments on commit b4adaa5

Please sign in to comment.