Skip to content

Commit

Permalink
[FLINK-18646] Verify memory manager empty in a separate thread with l…
Browse files Browse the repository at this point in the history
…arger timeout

UnsafeMemoryBudget#verifyEmpty, called on slot freeing, needs time to wait on GC of all allocated/released managed memory.
If there are a lot of segments to GC then it can take time to finish the check. If slot freeing happens in RPC thread,
the GC waiting can block it and TM risks to miss its heartbeat.

Another problem is that after UnsafeMemoryBudget#RETRIGGER_GC_AFTER_SLEEPS, System.gc() is called for each attempt to run a cleaner
even if there are already detected cleaners to run. This leads to triggering a lot of unnecessary GCs in background.

The PR offloads the verification into a separate thread and calls System.gc() only if memory cannot be reserved and
there are still no cleaners to run after long waiting. The timeout for normal memory reservation is increased to 2 second.
The full reservation, used for verification, gets 2 minute timeout.

This closes apache#12980.
  • Loading branch information
azagrebin committed Jul 28, 2020
1 parent 5dccc99 commit 3d056c8
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@
* continues to process any ready cleaners making {@link #MAX_SLEEPS} attempts before throwing {@link OutOfMemoryError}.
*/
class UnsafeMemoryBudget {
// max. number of sleeps during try-reserving with exponentially
// increasing delay before throwing OutOfMemoryError:
// 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
// which means that MemoryReservationException will be thrown after 1 s of trying
private static final int MAX_SLEEPS = 10;
private static final int MAX_SLEEPS = 11; // 2^11 - 1 = (2 x 1024) - 1 ms ~ 2 s total sleep duration
private static final int MAX_SLEEPS_VERIFY_EMPTY = 17; // 2^17 - 1 = (128 x 1024) - 1 ms ~ 2 min total sleep duration
private static final int RETRIGGER_GC_AFTER_SLEEPS = 9; // ~ 0.5 sec

private final long totalMemorySize;
Expand All @@ -61,7 +58,9 @@ long getAvailableMemorySize() {

boolean verifyEmpty() {
try {
reserveMemory(totalMemorySize);
// we wait longer than during the normal reserveMemory as we have to GC all memory,
// allocated by task, to perform the verification
reserveMemory(totalMemorySize, MAX_SLEEPS_VERIFY_EMPTY);
} catch (MemoryReservationException e) {
return false;
}
Expand All @@ -74,8 +73,26 @@ boolean verifyEmpty() {
*
* <p>Adjusted version of {@link java.nio.Bits#reserveMemory(long, int)} taken from Java 11.
*/
@SuppressWarnings({"OverlyComplexMethod", "JavadocReference", "NestedTryStatement"})
void reserveMemory(long size) throws MemoryReservationException {
reserveMemory(size, MAX_SLEEPS);
}

/**
* Reserve memory of certain size if it is available.
*
* <p>If the method cannot reserve immediately, it tries to process the phantom GC cleaners queue by
* calling {@link JavaGcCleanerWrapper#tryRunPendingCleaners()}. If it does not help,
* the method calls {@link System#gc} and tries again to reserve. If it still cannot reserve,
* it tries to process the phantom GC cleaners queue. If there are no cleaners to process,
* the method sleeps the {@code maxSleeps} number of times, starting 1 ms and each time doubling
* the sleeping duration: 1 (0), 2 (1), 4 (2), 8 (3), 16 (4), 32 (5), 64 (6), 128 (7), 256 (8), 512 (9), ...
* After the {@code RETRIGGER_GC_AFTER_SLEEPS} sleeps, the method also calls {@link System#gc} before sleeping.
* After the {@code maxSleeps} being unable to reserve, the {@link MemoryReservationException} is thrown.
*
* <p>Adjusted version of {@link java.nio.Bits#reserveMemory(long, int)} taken from Java 11.
*/
@SuppressWarnings({"OverlyComplexMethod", "JavadocReference", "NestedTryStatement"})
void reserveMemory(long size, int maxSleeps) throws MemoryReservationException {
long availableOrReserved = tryReserveMemory(size);
// optimist!
if (availableOrReserved >= size) {
Expand Down Expand Up @@ -122,15 +139,15 @@ void reserveMemory(long size) throws MemoryReservationException {
if (availableOrReserved >= size) {
return;
}
if (sleeps >= MAX_SLEEPS) {
if (sleeps >= maxSleeps) {
break;
}
if (sleeps >= RETRIGGER_GC_AFTER_SLEEPS) {
// trigger again VM's Reference processing if we have to wait longer
System.gc();
}
try {
if (!JavaGcCleanerWrapper.tryRunPendingCleaners()) {
if (sleeps >= RETRIGGER_GC_AFTER_SLEEPS) {
// trigger again VM's Reference processing if we have to wait longer
System.gc();
}
Thread.sleep(sleepTime);
sleepTime <<= 1;
sleeps++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ public static TaskManagerServices fromConfiguration(
taskManagerServicesConfiguration.getNumberOfSlots(),
taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
taskManagerServicesConfiguration.getPageSize());
taskManagerServicesConfiguration.getPageSize(),
ioExecutor);

final JobTable jobTable = DefaultJobTable.create();

Expand Down Expand Up @@ -340,7 +341,8 @@ private static TaskSlotTable<Task> createTaskSlotTable(
final int numberOfSlots,
final TaskExecutorResourceSpec taskExecutorResourceSpec,
final long timerServiceShutdownTimeout,
final int pageSize) {
final int pageSize,
final Executor memoryVerificationExecutor) {
final TimerService<AllocationID> timerService = new TimerService<>(
new ScheduledThreadPoolExecutor(1),
timerServiceShutdownTimeout);
Expand All @@ -349,7 +351,8 @@ private static TaskSlotTable<Task> createTaskSlotTable(
TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(taskExecutorResourceSpec),
TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(taskExecutorResourceSpec, numberOfSlots),
pageSize,
timerService);
timerService,
memoryVerificationExecutor);
}

private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -84,15 +85,22 @@ public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseableAsync {
/** The closing future is completed when the slot is freed and closed. */
private final CompletableFuture<Void> closingFuture;

/**
* {@link Executor} for background actions, e.g. verify all managed memory released.
*/
private final Executor asyncExecutor;

public TaskSlot(
final int index,
final ResourceProfile resourceProfile,
final int memoryPageSize,
final JobID jobId,
final AllocationID allocationId) {
final AllocationID allocationId,
final Executor asyncExecutor) {

this.index = index;
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
this.asyncExecutor = Preconditions.checkNotNull(asyncExecutor);

this.tasks = new HashMap<>(4);
this.state = TaskSlotState.ALLOCATED;
Expand Down Expand Up @@ -295,22 +303,24 @@ CompletableFuture<Void> closeAsync(Throwable cause) {
// and set the slot state to releasing so that it gets eventually freed
tasks.values().forEach(task -> task.failExternally(cause));
}
final CompletableFuture<Void> cleanupFuture = FutureUtils
.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
.thenRun(() -> {
verifyMemoryFreed();
this.memoryManager.shutdown();
});

FutureUtils.forward(cleanupFuture, closingFuture);
final CompletableFuture<Void> shutdownFuture = FutureUtils
.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
.thenRun(memoryManager::shutdown);
verifyAllManagedMemoryIsReleasedAfter(shutdownFuture);
FutureUtils.forward(shutdownFuture, closingFuture);
}
return closingFuture;
}

private void verifyMemoryFreed() {
if (!memoryManager.verifyEmpty()) {
LOG.warn("Not all slot memory is freed, potential memory leak at {}", this);
}
private void verifyAllManagedMemoryIsReleasedAfter(CompletableFuture<Void> after) {
after.thenRunAsync(
() -> {
if (!memoryManager.verifyEmpty()) {
LOG.warn("Not all slot memory is freed, potential memory leak at {}", this);
}
},
asyncExecutor);
}

private static MemoryManager createMemoryManager(ResourceProfile resourceProfile, int pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -107,12 +108,18 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab
"TaskSlotTableImpl is not initialized with proper main thread executor, " +
"call to TaskSlotTableImpl#start is required");

/**
* {@link Executor} for background actions, e.g. verify all managed memory released.
*/
private final Executor memoryVerificationExecutor;

public TaskSlotTableImpl(
final int numberSlots,
final ResourceProfile totalAvailableResourceProfile,
final ResourceProfile defaultSlotResourceProfile,
final int memoryPageSize,
final TimerService<AllocationID> timerService) {
final TimerService<AllocationID> timerService,
final Executor memoryVerificationExecutor) {
Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");

this.numberSlots = numberSlots;
Expand All @@ -134,6 +141,8 @@ public TaskSlotTableImpl(
slotActions = null;
state = State.CREATED;
closingFuture = new CompletableFuture<>();

this.memoryVerificationExecutor = memoryVerificationExecutor;
}

@Override
Expand Down Expand Up @@ -289,7 +298,7 @@ public boolean allocateSlot(
return false;
}

taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId);
taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId, memoryVerificationExecutor);
if (index >= 0) {
taskSlots.put(index, taskSlot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,16 @@ public void testComputeMemorySizeFailForNegativeFraction() {
memoryManager.computeMemorySize(-0.1);
}

@Test
public void testVerifyEmptyCanBeDoneAfterShutdown() throws MemoryAllocationException, MemoryReservationException {
memoryManager.release(memoryManager.allocatePages(new Object(), 1));
Object owner = new Object();
memoryManager.reserveMemory(owner, MemoryManager.DEFAULT_PAGE_SIZE);
memoryManager.releaseMemory(owner, MemoryManager.DEFAULT_PAGE_SIZE);
memoryManager.shutdown();
memoryManager.verifyEmpty();
}

private void testCannotAllocateAnymore(Object owner, int numPages) {
try {
memoryManager.allocatePages(owner, numPages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2054,7 +2054,13 @@ private static final class AllocateSlotNotifyingTaskSlotTable extends TaskSlotTa
private final OneShotLatch allocateSlotLatch;

private AllocateSlotNotifyingTaskSlotTable(OneShotLatch allocateSlotLatch) {
super(1, createTotalResourceProfile(1), DEFAULT_RESOURCE_PROFILE, MemoryManager.MIN_PAGE_SIZE, createDefaultTimerService(timeout.toMilliseconds()));
super(
1,
createTotalResourceProfile(1),
DEFAULT_RESOURCE_PROFILE,
MemoryManager.MIN_PAGE_SIZE,
createDefaultTimerService(timeout.toMilliseconds()),
Executors.newDirectExecutorService());
this.allocateSlotLatch = allocateSlotLatch;
}

Expand All @@ -2080,7 +2086,13 @@ private static final class ActivateSlotNotifyingTaskSlotTable extends TaskSlotTa
private final CountDownLatch slotsToActivate;

private ActivateSlotNotifyingTaskSlotTable(int numberOfDefaultSlots, CountDownLatch slotsToActivate) {
super(numberOfDefaultSlots, createTotalResourceProfile(numberOfDefaultSlots), DEFAULT_RESOURCE_PROFILE, MemoryManager.MIN_PAGE_SIZE, createDefaultTimerService(timeout.toMilliseconds()));
super(
numberOfDefaultSlots,
createTotalResourceProfile(numberOfDefaultSlots),
DEFAULT_RESOURCE_PROFILE,
MemoryManager.MIN_PAGE_SIZE,
createDefaultTimerService(timeout.toMilliseconds()),
Executors.newDirectExecutorService());
this.slotsToActivate = slotsToActivate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -55,6 +56,6 @@ public void testTaskSlotClosedOnlyWhenAddedTasksTerminated() throws Exception {
}

private static <T extends TaskSlotPayload> TaskSlot<T> createTaskSlot() {
return new TaskSlot<>(0, ResourceProfile.ZERO, MemoryManager.MIN_PAGE_SIZE, JOB_ID, ALLOCATION_ID);
return new TaskSlot<>(0, ResourceProfile.ZERO, MemoryManager.MIN_PAGE_SIZE, JOB_ID, ALLOCATION_ID, Executors.newDirectExecutorService());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.testingUtils.TestingUtils;

Expand Down Expand Up @@ -59,7 +60,8 @@ private static <T extends TaskSlotPayload> TaskSlotTableImpl<T> createTaskSlotTa
createTotalResourceProfile(numberOfSlots),
DEFAULT_RESOURCE_PROFILE,
MemoryManager.MIN_PAGE_SIZE,
timerService);
timerService,
Executors.newDirectExecutorService());
}

public static ResourceProfile createTotalResourceProfile(int numberOfSlots) {
Expand Down

0 comments on commit 3d056c8

Please sign in to comment.