diff --git a/docs/_includes/generated/resource_manager_configuration.html b/docs/_includes/generated/resource_manager_configuration.html
index 03090e90c0331d..7cfcec69e1b136 100644
--- a/docs/_includes/generated/resource_manager_configuration.html
+++ b/docs/_includes/generated/resource_manager_configuration.html
@@ -38,5 +38,11 @@
Integer |
Defines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink. |
+
+ slotmanager.redundant-taskmanager-num |
+ 0 |
+ Integer |
+ The number of redundant task managers. Redundant task managers are extra task managers started by Flink, in order to speed up job recovery in case of failures due to task manager lost. Note that this feature is available only to the active deployments (native K8s, Yarn and Mesos). |
+
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 1ba26992e8ea5e..423fc3af9400ee 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -67,6 +67,19 @@ public class ResourceManagerOptions {
"for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take " +
"effect for standalone clusters, where how many slots are allocated is not controlled by Flink.");
+ /**
+ * The number of redundant task managers. Redundant task managers are extra task managers started by Flink,
+ * in order to speed up job recovery in case of failures due to task manager lost.
+ * Note that this feature is available only to the active deployments (native K8s, Yarn and Mesos).
+ */
+ public static final ConfigOption REDUNDANT_TASK_MANAGER_NUM = ConfigOptions
+ .key("slotmanager.redundant-taskmanager-num")
+ .intType()
+ .defaultValue(0)
+ .withDescription("The number of redundant task managers. Redundant task managers are extra task managers " +
+ "started by Flink, in order to speed up job recovery in case of failures due to task manager lost. " +
+ "Note that this feature is available only to the active deployments (native K8s, Yarn and Mesos).");
+
/**
* The timeout for a slot request to be discarded, in milliseconds.
* @deprecated Use {@link JobManagerOptions#SLOT_REQUEST_TIMEOUT}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index 5647129007b3ac..0b01b70cfdee97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -48,6 +48,7 @@ public class SlotManagerConfiguration {
private final WorkerResourceSpec defaultWorkerResourceSpec;
private final int numSlotsPerWorker;
private final int maxSlotNum;
+ private final int redundantTaskManagerNum;
public SlotManagerConfiguration(
Time taskManagerRequestTimeout,
@@ -57,7 +58,8 @@ public SlotManagerConfiguration(
SlotMatchingStrategy slotMatchingStrategy,
WorkerResourceSpec defaultWorkerResourceSpec,
int numSlotsPerWorker,
- int maxSlotNum) {
+ int maxSlotNum,
+ int redundantTaskManagerNum) {
this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
@@ -69,6 +71,8 @@ public SlotManagerConfiguration(
Preconditions.checkState(maxSlotNum > 0);
this.numSlotsPerWorker = numSlotsPerWorker;
this.maxSlotNum = maxSlotNum;
+ Preconditions.checkState(redundantTaskManagerNum >= 0);
+ this.redundantTaskManagerNum = redundantTaskManagerNum;
}
public Time getTaskManagerRequestTimeout() {
@@ -103,6 +107,10 @@ public int getMaxSlotNum() {
return maxSlotNum;
}
+ public int getRedundantTaskManagerNum() {
+ return redundantTaskManagerNum;
+ }
+
public static SlotManagerConfiguration fromConfiguration(
Configuration configuration,
WorkerResourceSpec defaultWorkerResourceSpec) throws ConfigurationException {
@@ -130,6 +138,8 @@ public static SlotManagerConfiguration fromConfiguration(
int maxSlotNum = configuration.getInteger(ResourceManagerOptions.MAX_SLOT_NUM);
+ int redundantTaskManagerNum = configuration.getInteger(ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM);
+
return new SlotManagerConfiguration(
rpcTimeout,
slotRequestTimeout,
@@ -138,7 +148,8 @@ public static SlotManagerConfiguration fromConfiguration(
slotMatchingStrategy,
defaultWorkerResourceSpec,
numSlotsPerWorker,
- maxSlotNum);
+ maxSlotNum,
+ redundantTaskManagerNum);
}
private static Time getSlotRequestTimeout(final Configuration configuration) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index f48fd1e26e8a1c..b4937c21ab4938 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -114,7 +114,7 @@ public class SlotManagerImpl implements SlotManager {
/** Callbacks for resource (de-)allocations. */
private ResourceActions resourceActions;
- private ScheduledFuture> taskManagerTimeoutCheck;
+ private ScheduledFuture> taskManagerTimeoutsAndRedundancyCheck;
private ScheduledFuture> slotRequestTimeoutCheck;
@@ -127,6 +127,9 @@ public class SlotManagerImpl implements SlotManager {
/** Defines the max limitation of the total number of slots. */
private final int maxSlotNum;
+ /** Defines the number of redundant taskmanagers. */
+ private final int redundantTaskManagerNum;
+
/**
* If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request to pend.
* A slot request is considered unfulfillable if it cannot be fulfilled by neither a slot that is already registered
@@ -163,6 +166,7 @@ public SlotManagerImpl(
this.defaultSlotResourceProfile = generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);
this.slotManagerMetricGroup = Preconditions.checkNotNull(slotManagerMetricGroup);
this.maxSlotNum = slotManagerConfiguration.getMaxSlotNum();
+ this.redundantTaskManagerNum = slotManagerConfiguration.getRedundantTaskManagerNum();
slots = new HashMap<>(16);
freeSlots = new LinkedHashMap<>(16);
@@ -174,7 +178,7 @@ public SlotManagerImpl(
resourceManagerId = null;
resourceActions = null;
mainThreadExecutor = null;
- taskManagerTimeoutCheck = null;
+ taskManagerTimeoutsAndRedundancyCheck = null;
slotRequestTimeoutCheck = null;
started = false;
@@ -284,9 +288,9 @@ public void start(ResourceManagerId newResourceManagerId, Executor newMainThread
started = true;
- taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
+ taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(
- () -> checkTaskManagerTimeouts()),
+ () -> checkTaskManagerTimeoutsAndRedundancy()),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
@@ -318,9 +322,9 @@ public void suspend() {
LOG.info("Suspending the SlotManager.");
// stop the timeout checks for the TaskManagers and the SlotRequests
- if (taskManagerTimeoutCheck != null) {
- taskManagerTimeoutCheck.cancel(false);
- taskManagerTimeoutCheck = null;
+ if (taskManagerTimeoutsAndRedundancyCheck != null) {
+ taskManagerTimeoutsAndRedundancyCheck.cancel(false);
+ taskManagerTimeoutsAndRedundancyCheck = null;
}
if (slotRequestTimeoutCheck != null) {
@@ -915,6 +919,30 @@ private boolean isMaxSlotNumExceededAfterAdding(int numNewSlot) {
return getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numNewSlot > maxSlotNum;
}
+ private void allocateRedundantTaskManagers(int number) {
+ int allocatedNumber = allocateResources(number);
+ if (number != allocatedNumber) {
+ LOG.warn("Expect to allocate {} taskManagers. Actually allocate {} taskManagers.", number, allocatedNumber);
+ }
+ }
+
+ /**
+ * Allocate a number of workers based on the input param.
+ * @param workerNum the number of workers to allocate.
+ * @return the number of allocated workers successfully.
+ */
+ private int allocateResources(int workerNum) {
+ int allocatedWorkerNum = 0;
+ for (int i = 0; i < workerNum; ++i) {
+ if (allocateResource(defaultSlotResourceProfile).isPresent()) {
+ ++allocatedWorkerNum;
+ } else {
+ break;
+ }
+ }
+ return allocatedWorkerNum;
+ }
+
private Optional allocateResource(ResourceProfile requestedSlotResourceProfile) {
final int numRegisteredSlots = getNumberRegisteredSlots();
final int numPendingSlots = getNumberPendingTaskManagerSlots();
@@ -1208,11 +1236,11 @@ public static ResourceProfile generateDefaultSlotResourceProfile(WorkerResourceS
}
// ---------------------------------------------------------------------------------------------
- // Internal timeout methods
+ // Internal periodic check methods
// ---------------------------------------------------------------------------------------------
@VisibleForTesting
- void checkTaskManagerTimeouts() {
+ void checkTaskManagerTimeoutsAndRedundancy() {
if (!taskManagerRegistrations.isEmpty()) {
long currentTime = System.currentTimeMillis();
@@ -1227,13 +1255,25 @@ void checkTaskManagerTimeouts() {
}
}
- // second we trigger the release resource callback which can decide upon the resource release
- for (TaskManagerRegistration taskManagerRegistration : timedOutTaskManagers) {
- if (waitResultConsumedBeforeRelease) {
- releaseTaskExecutorIfPossible(taskManagerRegistration);
- } else {
- releaseTaskExecutor(taskManagerRegistration.getInstanceId());
- }
+ int slotsDiff = redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size();
+ if (slotsDiff > 0) {
+ // Keep enough redundant taskManagers from time to time.
+ int requiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker);
+ allocateRedundantTaskManagers(requiredTaskManagers);
+ } else {
+ // second we trigger the release resource callback which can decide upon the resource release
+ int maxReleaseNum = (-slotsDiff) / numSlotsPerWorker;
+ releaseTaskExecutors(timedOutTaskManagers, Math.min(maxReleaseNum, timedOutTaskManagers.size()));
+ }
+ }
+ }
+
+ private void releaseTaskExecutors(ArrayList timedOutTaskManagers, int releaseNum) {
+ for (int index = 0; index < releaseNum; ++index) {
+ if (waitResultConsumedBeforeRelease) {
+ releaseTaskExecutorIfPossible(timedOutTaskManagers.get(index));
+ } else {
+ releaseTaskExecutor(timedOutTaskManagers.get(index).getInstanceId());
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 15cd0f511acaef..19d4bb8860190a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -76,7 +76,8 @@ public void confirmLeadership(UUID leaderId, String leaderAddress) {
AnyMatchingSlotMatchingStrategy.INSTANCE,
WorkerResourceSpec.ZERO,
1,
- ResourceManagerOptions.MAX_SLOT_NUM.defaultValue()));
+ ResourceManagerOptions.MAX_SLOT_NUM.defaultValue(),
+ ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue()));
ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
highAvailabilityServices,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerBuilder.java
index 1ea72a0f5fde20..2209f11817c7fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerBuilder.java
@@ -40,6 +40,7 @@ public class SlotManagerBuilder {
private int numSlotsPerWorker;
private SlotManagerMetricGroup slotManagerMetricGroup;
private int maxSlotNum;
+ private int redundantTaskManagerNum;
private SlotManagerBuilder() {
this.slotMatchingStrategy = AnyMatchingSlotMatchingStrategy.INSTANCE;
@@ -52,6 +53,7 @@ private SlotManagerBuilder() {
this.numSlotsPerWorker = 1;
this.slotManagerMetricGroup = UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
this.maxSlotNum = ResourceManagerOptions.MAX_SLOT_NUM.defaultValue();
+ this.redundantTaskManagerNum = ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue();
}
public static SlotManagerBuilder newBuilder() {
@@ -108,6 +110,11 @@ public SlotManagerBuilder setMaxSlotNum(int maxSlotNum) {
return this;
}
+ public SlotManagerBuilder setRedundantTaskManagerNum(int redundantTaskManagerNum) {
+ this.redundantTaskManagerNum = redundantTaskManagerNum;
+ return this;
+ }
+
public SlotManagerImpl build() {
final SlotManagerConfiguration slotManagerConfiguration = new SlotManagerConfiguration(
taskManagerRequestTimeout,
@@ -117,7 +124,8 @@ public SlotManagerImpl build() {
slotMatchingStrategy,
defaultWorkerResourceSpec,
numSlotsPerWorker,
- maxSlotNum);
+ maxSlotNum,
+ redundantTaskManagerNum);
return new SlotManagerImpl(
scheduledExecutor,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
index 3b2be57bdb47c1..a04d8b79697abc 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
@@ -908,6 +908,7 @@ public void testTimeoutForUnusedTaskManager() throws Exception {
try (final SlotManagerImpl slotManager = createSlotManagerBuilder()
.setTaskManagerTimeout(Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))
+ .setRedundantTaskManagerNum(0)
.build()) {
slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
@@ -976,6 +977,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
try (final SlotManager slotManager = createSlotManagerBuilder()
.setTaskManagerTimeout(taskManagerTimeout)
+ .setRedundantTaskManagerNum(0)
.buildAndStartWithDirectExec(resourceManagerId, resourceActions)) {
slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport);
@@ -1299,6 +1301,7 @@ private SlotManagerImpl createSlotManager(ResourceManagerId resourceManagerId, R
private SlotManagerImpl createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions, int numSlotsPerWorker) {
SlotManagerImpl slotManager = createSlotManagerBuilder()
.setNumSlotsPerWorker(numSlotsPerWorker)
+ .setRedundantTaskManagerNum(0)
.buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions);
return slotManager;
}
@@ -1595,6 +1598,7 @@ public void testMaxSlotLimitRegisterResource() throws Exception {
try (SlotManagerImpl slotManager = createSlotManagerBuilder()
.setNumSlotsPerWorker(numberSlots)
.setMaxSlotNum(maxSlotNum)
+ .setRedundantTaskManagerNum(0)
.buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection1, slotReport1);
slotManager.registerTaskManager(taskManagerConnection2, slotReport2);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java
similarity index 56%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java
index b1a459cf068bfd..bd05848e3e6f34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.java
@@ -40,8 +40,10 @@
import org.junit.Before;
import org.junit.Test;
+import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
@@ -51,7 +53,7 @@
/**
* Test suite for idle task managers release in slot manager.
*/
-public class TaskManagerReleaseInSlotManagerTest extends TestLogger {
+public class TaskManagerCheckInSlotManagerTest extends TestLogger {
private static final ResourceID resourceID = ResourceID.generate();
private static final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
private static final SlotID slotId = new SlotID(resourceID, 0);
@@ -69,13 +71,21 @@ public class TaskManagerReleaseInSlotManagerTest extends TestLogger {
private CompletableFuture releaseFuture;
private ResourceActions resourceManagerActions;
private ManuallyTriggeredScheduledExecutor mainThreadExecutor;
+ private final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
+ private final AtomicInteger releaseResourceCalls = new AtomicInteger(0);
@Before
public void setup() {
canBeReleasedFuture.set(new CompletableFuture<>());
releaseFuture = new CompletableFuture<>();
+ allocateResourceCalls.getAndSet(0);
+ releaseResourceCalls.getAndSet(0);
resourceManagerActions = new TestingResourceActionsBuilder()
- .setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
+ .setReleaseResourceConsumer((instanceID, e) -> {
+ releaseFuture.complete(instanceID);
+ releaseResourceCalls.incrementAndGet();
+ })
+ .setAllocateResourceConsumer(ignored -> allocateResourceCalls.incrementAndGet())
.build();
mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
}
@@ -100,6 +110,74 @@ public void testTaskManagerTimeout() throws Exception {
}
}
+ /**
+ * Register four taskManagers that all have two slots.
+ * For taskManager0, both slots are free.
+ * For taskManager1, both slots are allocated.
+ * For taskManager2, One slot is allocated, the other is free.
+ * For taskManager3, one slot is free, the other is allocated.
+ * If redundantTaskManagerNum is 0, the idle taskManager should be released.
+ * @throws Exception
+ */
+ @Test
+ public void testTaskManagerTimeoutWithZeroRedundantTaskManager() throws Exception {
+ registerAndCheckMultiTaskManagers(0);
+
+ assertThat(allocateResourceCalls.get(), is(0));
+ assertThat(releaseResourceCalls.get(), is(1));
+ }
+
+ /**
+ * Register four taskManagers that all have two slots.
+ * For taskManager0, both slots are free.
+ * For taskManager1, both slots are allocated.
+ * For taskManager2, One slot is allocated, the other is free.
+ * For taskManager3, one slot is free, the other is allocated.
+ * If redundantTaskManagerNum is 1, two free slots are needed and the idle taskManager should be released.
+ * @throws Exception
+ */
+ @Test
+ public void testTaskManagerTimeoutWithOneRedundantTaskManager() throws Exception {
+ registerAndCheckMultiTaskManagers(1);
+
+ assertThat(allocateResourceCalls.get(), is(0));
+ assertThat(releaseResourceCalls.get(), is(1));
+ }
+
+ /**
+ * Register four taskManagers that all have two slots.
+ * For taskManager0, both slots are free.
+ * For taskManager1, both slots are allocated.
+ * For taskManager2, One slot is allocated, the other is free.
+ * For taskManager3, one slot is free, the other is allocated.
+ * If redundantTaskManagerNum is 2, four free slots can satisfy the requirement.
+ * @throws Exception
+ */
+ @Test
+ public void testTaskManagerTimeoutWithTwoRedundantTaskManager() throws Exception {
+ registerAndCheckMultiTaskManagers(2);
+
+ assertThat(allocateResourceCalls.get(), is(0));
+ assertThat(releaseResourceCalls.get(), is(0));
+ }
+
+ /**
+ * Register four taskManagers that all have two slots.
+ * For taskManager0, both slots are free.
+ * For taskManager1, both slots are allocated.
+ * For taskManager2, One slot is allocated, the other is free.
+ * For taskManager3, one slot is free, the other is allocated.
+ * If redundantTaskManagerNum is 3, two more free slots are needed and another taskManager should be allocated.
+ * @throws Exception
+ */
+ @Test
+ public void testTaskManagerTimeoutWithThreeRedundantTaskManager() throws Exception {
+ registerAndCheckMultiTaskManagers(3);
+
+ assertThat(allocateResourceCalls.get(), is(1));
+ assertThat(releaseResourceCalls.get(), is(0));
+ }
+
/**
* Tests that idle but not releasable task managers will not be released even if timed out before it can be.
*/
@@ -135,14 +213,74 @@ public void testTaskManagerIsNotReleasedInCaseOfConcurrentAllocation() throws Ex
}
}
+ /**
+ * Register four taskManagers that all have two slots.
+ * The difference between the taskManagers is whether the slot is allocated.
+ * To maintain redundantTaskManagerNum, SlotManagerImpl may release or allocate taskManagers.
+ * @param redundantTaskManagerNum
+ * @throws Exception
+ */
+ private void registerAndCheckMultiTaskManagers(int redundantTaskManagerNum) throws Exception {
+ SlotManagerImpl slotManager = createAndStartSlotManager(redundantTaskManagerNum, 2);
+
+ // Both slots are free.
+ registerTaskManagerWithTwoSlots(slotManager, true, true);
+
+ // Both slots are allocated.
+ registerTaskManagerWithTwoSlots(slotManager, false, false);
+
+ // One slot is allocated, the other is free.
+ registerTaskManagerWithTwoSlots(slotManager, false, true);
+
+ // One slot is free, the other is allocated.
+ registerTaskManagerWithTwoSlots(slotManager, true, false);
+
+ checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
+ }
+
+ private void registerTaskManagerWithTwoSlots(SlotManagerImpl slotManager,
+ boolean slot0Free,
+ boolean slot1Free) {
+ canBeReleasedFuture.set(new CompletableFuture<>());
+
+ ResourceID resourceID = ResourceID.generate();
+ ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0, 1);
+ JobID jobID = new JobID();
+
+ SlotID slotId0 = new SlotID(resourceID, 0);
+ SlotStatus slotStatus0 = slot0Free ? new SlotStatus(slotId0, resourceProfile)
+ : new SlotStatus(slotId0, resourceProfile, jobID, new AllocationID());
+
+ SlotID slotId1 = new SlotID(resourceID, 1);
+ SlotStatus slotStatus1 = slot1Free ? new SlotStatus(slotId1, resourceProfile) :
+ new SlotStatus(slotId1, resourceProfile, jobID, new AllocationID());
+
+ SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus0, slotStatus1));
+
+ TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setCanBeReleasedSupplier(canBeReleasedFuture::get)
+ .createTestingTaskExecutorGateway();
+ TaskExecutorConnection taskManagerConnection =
+ new TaskExecutorConnection(resourceID, taskExecutorGateway);
+
+ mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
+ }
+
private SlotManagerImpl createAndStartSlotManagerWithTM() {
+ SlotManagerImpl slotManager = createAndStartSlotManager(0, 1);
+ mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
+ return slotManager;
+ }
+
+ private SlotManagerImpl createAndStartSlotManager(int redundantTaskManagerNum, int numSlotsPerWorker) {
SlotManagerImpl slotManager = SlotManagerBuilder
.newBuilder()
.setScheduledExecutor(mainThreadExecutor)
.setTaskManagerTimeout(Time.milliseconds(0L))
+ .setRedundantTaskManagerNum(redundantTaskManagerNum)
+ .setNumSlotsPerWorker(numSlotsPerWorker)
.build();
slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
- mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
return slotManager;
}
@@ -157,7 +295,7 @@ private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(
boolean canBeReleased,
RunnableWithException doAfterCheckTriggerBeforeCanBeReleasedResponse) throws Exception {
canBeReleasedFuture.set(new CompletableFuture<>());
- mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
+ mainThreadExecutor.execute(slotManager::checkTaskManagerTimeoutsAndRedundancy); // trigger TM.canBeReleased request
mainThreadExecutor.triggerAll();
doAfterCheckTriggerBeforeCanBeReleasedResponse.run();
canBeReleasedFuture.get().complete(canBeReleased); // finish TM.canBeReleased request