Skip to content

Commit

Permalink
[apache#1571] fix(server): Memory may leak when EventInvalidException…
Browse files Browse the repository at this point in the history
… occurs
  • Loading branch information
leslizhang committed Mar 15, 2024
1 parent 9737d57 commit 0904f87
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -50,17 +51,20 @@ public class DefaultFlushEventHandler implements FlushEventHandler {
private final StorageType storageType;
protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue = Queues.newLinkedBlockingQueue();
private ConsumerWithException<ShuffleDataFlushEvent> eventConsumer;
private final ShuffleServer shuffleServer;

private volatile boolean stopped = false;

public DefaultFlushEventHandler(
ShuffleServerConf conf,
StorageManager storageManager,
ShuffleServer shuffleServer,
ConsumerWithException<ShuffleDataFlushEvent> eventConsumer) {
this.shuffleServerConf = conf;
this.storageType =
StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE).name());
this.storageManager = storageManager;
this.shuffleServer = shuffleServer;
this.eventConsumer = eventConsumer;
initFlushEventExecutor();
}
Expand All @@ -83,8 +87,18 @@ public void handle(ShuffleDataFlushEvent event) {
*/
private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, Storage storage) {
long start = System.currentTimeMillis();
String appId = event.getAppId();

ReentrantReadWriteLock.ReadLock readLock =
shuffleServer.getShuffleTaskManager().getAppReadLock(appId);
try {
eventConsumer.accept(event);
readLock.lock();
try {
eventConsumer.accept(event);
} finally {
readLock.unlock();
}

if (storage != null) {
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
}
Expand Down Expand Up @@ -124,8 +138,7 @@ private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, Storage st
}

if (e instanceof EventInvalidException) {
// Invalid events have already been released / cleaned up
// so no need to call event.doCleanup() here
event.doCleanup();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ShuffleDataFlushEvent {
private final long size;
private final List<ShufflePartitionedBlock> shuffleBlocks;
private final Supplier<Boolean> valid;

private final ShuffleBuffer shuffleBuffer;
private final AtomicInteger retryTimes = new AtomicInteger();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public ShuffleFlushManager(
storageBasePaths = RssUtils.getConfiguredLocalDirs(shuffleServerConf);
pendingEventTimeoutSec = shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
eventHandler =
new DefaultFlushEventHandler(shuffleServerConf, storageManager, this::processFlushEvent);
new DefaultFlushEventHandler(
shuffleServerConf, storageManager, shuffleServer, this::processFlushEvent);
}

public void addToFlushQueue(ShuffleDataFlushEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
Expand Down Expand Up @@ -112,7 +112,7 @@ public class ShuffleTaskManager {
private Map<Long, PreAllocatedBufferInfo> requireBufferIds = JavaUtils.newConcurrentMap();
private Thread clearResourceThread;
private BlockingQueue<PurgeEvent> expiredAppIdQueue = Queues.newLinkedBlockingQueue();
private final Cache<String, Lock> appLocks;
private final Cache<String, ReentrantReadWriteLock> appLocks;

public ShuffleTaskManager(
ShuffleServerConf conf,
Expand Down Expand Up @@ -222,9 +222,18 @@ public ShuffleTaskManager(
topNShuffleDataSizeOfAppCalcTask.start();
}

private Lock getAppLock(String appId) {
public ReentrantReadWriteLock.WriteLock getAppWriteLock(String appId) {
try {
return appLocks.get(appId, ReentrantLock::new);
return appLocks.get(appId, ReentrantReadWriteLock::new).writeLock();
} catch (ExecutionException e) {
LOG.error("Failed to get App lock.", e);
throw new RssException(e);
}
}

public ReentrantReadWriteLock.ReadLock getAppReadLock(String appId) {
try {
return appLocks.get(appId, ReentrantReadWriteLock::new).readLock();
} catch (ExecutionException e) {
LOG.error("Failed to get App lock.", e);
throw new RssException(e);
Expand Down Expand Up @@ -257,7 +266,7 @@ public StatusCode registerShuffle(
String user,
ShuffleDataDistributionType dataDistType,
int maxConcurrencyPerPartitionToWrite) {
Lock lock = getAppLock(appId);
ReentrantReadWriteLock.WriteLock lock = getAppWriteLock(appId);
try {
lock.lock();
refreshAppId(appId);
Expand Down Expand Up @@ -692,35 +701,42 @@ private boolean isAppExpired(String appId) {
* @param shuffleIds
*/
public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds) {
if (CollectionUtils.isEmpty(shuffleIds)) {
return;
}
Lock writeLock = getAppWriteLock(appId);
writeLock.lock();
try {
if (CollectionUtils.isEmpty(shuffleIds)) {
return;
}

LOG.info("Start remove resource for appId[{}], shuffleIds[{}]", appId, shuffleIds);
final long start = System.currentTimeMillis();
final ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
if (taskInfo != null) {
for (Integer shuffleId : shuffleIds) {
taskInfo.getCachedBlockIds().remove(shuffleId);
taskInfo.getCommitCounts().remove(shuffleId);
taskInfo.getCommitLocks().remove(shuffleId);
LOG.info("Start remove resource for appId[{}], shuffleIds[{}]", appId, shuffleIds);
final long start = System.currentTimeMillis();
final ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
if (taskInfo != null) {
for (Integer shuffleId : shuffleIds) {
taskInfo.getCachedBlockIds().remove(shuffleId);
taskInfo.getCommitCounts().remove(shuffleId);
taskInfo.getCommitLocks().remove(shuffleId);
}
}
Optional.ofNullable(partitionsToBlockIds.get(appId))
.ifPresent(
x -> {
for (Integer shuffleId : shuffleIds) {
x.remove(shuffleId);
}
});
shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds);
shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds);
storageManager.removeResources(
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds));
LOG.info(
"Finish remove resource for appId[{}], shuffleIds[{}], cost[{}]",
appId,
shuffleIds,
System.currentTimeMillis() - start);
} finally {
writeLock.unlock();
}
Optional.ofNullable(partitionsToBlockIds.get(appId))
.ifPresent(
x -> {
for (Integer shuffleId : shuffleIds) {
x.remove(shuffleId);
}
});
shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds);
shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds);
storageManager.removeResources(new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds));
LOG.info(
"Finish remove resource for appId[{}], shuffleIds[{}], cost[{}]",
appId,
shuffleIds,
System.currentTimeMillis() - start);
}

public void checkLeakShuffleData() {
Expand All @@ -736,7 +752,7 @@ public void checkLeakShuffleData() {

@VisibleForTesting
public void removeResources(String appId, boolean checkAppExpired) {
Lock lock = getAppLock(appId);
Lock lock = getAppWriteLock(appId);
try {
lock.lock();
LOG.info("Start remove resource for appId[" + appId + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.uniffle.server.buffer;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -291,23 +293,37 @@ protected void flushBuffer(
int startPartition,
int endPartition,
boolean isHugePartition) {
ShuffleDataFlushEvent event =
buffer.toFlushEvent(
appId,
shuffleId,
startPartition,
endPartition,
() -> bufferPool.containsKey(appId),
shuffleFlushManager.getDataDistributionType(appId));
if (event != null) {
event.addCleanupCallback(() -> releaseMemory(event.getSize(), true, false));
updateShuffleSize(appId, shuffleId, -event.getSize());
inFlushSize.addAndGet(event.getSize());
if (isHugePartition) {
event.markOwnedByHugePartition();

ReentrantReadWriteLock.ReadLock readLock = shuffleTaskManager.getAppReadLock(appId);
readLock.lock();
if (!bufferPool.getOrDefault(appId, new HashMap<>()).containsKey(shuffleId)) {
LOG.info(
"Shuffle[{}] for app[{}] has already been removed, no need to flush the buffer",
shuffleId,
appId);
return;
}
try {
ShuffleDataFlushEvent event =
buffer.toFlushEvent(
appId,
shuffleId,
startPartition,
endPartition,
() -> bufferPool.getOrDefault(appId, new HashMap<>()).containsKey(shuffleId),
shuffleFlushManager.getDataDistributionType(appId));
if (event != null) {
event.addCleanupCallback(() -> releaseMemory(event.getSize(), true, false));
updateShuffleSize(appId, shuffleId, -event.getSize());
inFlushSize.addAndGet(event.getSize());
if (isHugePartition) {
event.markOwnedByHugePartition();
}
ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
shuffleFlushManager.addToFlushQueue(event);
}
ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
shuffleFlushManager.addToFlushQueue(event);
} finally {
readLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
endPartition,
size,
spbs,
isValid,
null,
null);
}

Expand All @@ -613,7 +613,7 @@ public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
endPartition,
(long) blockNum * blockSize,
createBlock(blockNum, blockSize),
isValid,
null,
null);
}

Expand Down

0 comments on commit 0904f87

Please sign in to comment.