Skip to content

Commit

Permalink
[FLINK-18625][runtime] Maintain redundant taskmanagers to speed up fa…
Browse files Browse the repository at this point in the history
…ilover
  • Loading branch information
Myracle committed Jul 29, 2020
1 parent 2658510 commit 800ed5c
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 24 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/resource_manager_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,11 @@
<td>Integer</td>
<td>Defines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.</td>
</tr>
<tr>
<td><h5>slotmanager.redundant-taskmanager-num</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
<td>The number of redundant task managers. Redundant task managers are extra task managers started by Flink, in order to speed up job recovery in case of failures due to task manager lost. Note that this feature is available only to the active deployments (native K8s, Yarn and Mesos).</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ public class ResourceManagerOptions {
"for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take " +
"effect for standalone clusters, where how many slots are allocated is not controlled by Flink.");

/**
* The number of redundant task managers. Redundant task managers are extra task managers started by Flink,
* in order to speed up job recovery in case of failures due to task manager lost.
* Note that this feature is available only to the active deployments (native K8s, Yarn and Mesos).
*/
public static final ConfigOption<Integer> REDUNDANT_TASK_MANAGER_NUM = ConfigOptions
.key("slotmanager.redundant-taskmanager-num")
.intType()
.defaultValue(0)
.withDescription("The number of redundant task managers. Redundant task managers are extra task managers " +
"started by Flink, in order to speed up job recovery in case of failures due to task manager lost. " +
"Note that this feature is available only to the active deployments (native K8s, Yarn and Mesos).");

/**
* The timeout for a slot request to be discarded, in milliseconds.
* @deprecated Use {@link JobManagerOptions#SLOT_REQUEST_TIMEOUT}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class SlotManagerConfiguration {
private final WorkerResourceSpec defaultWorkerResourceSpec;
private final int numSlotsPerWorker;
private final int maxSlotNum;
private final int redundantTaskManagerNum;

public SlotManagerConfiguration(
Time taskManagerRequestTimeout,
Expand All @@ -57,7 +58,8 @@ public SlotManagerConfiguration(
SlotMatchingStrategy slotMatchingStrategy,
WorkerResourceSpec defaultWorkerResourceSpec,
int numSlotsPerWorker,
int maxSlotNum) {
int maxSlotNum,
int redundantTaskManagerNum) {

this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
Expand All @@ -69,6 +71,8 @@ public SlotManagerConfiguration(
Preconditions.checkState(maxSlotNum > 0);
this.numSlotsPerWorker = numSlotsPerWorker;
this.maxSlotNum = maxSlotNum;
Preconditions.checkState(redundantTaskManagerNum >= 0);
this.redundantTaskManagerNum = redundantTaskManagerNum;
}

public Time getTaskManagerRequestTimeout() {
Expand Down Expand Up @@ -103,6 +107,10 @@ public int getMaxSlotNum() {
return maxSlotNum;
}

public int getRedundantTaskManagerNum() {
return redundantTaskManagerNum;
}

public static SlotManagerConfiguration fromConfiguration(
Configuration configuration,
WorkerResourceSpec defaultWorkerResourceSpec) throws ConfigurationException {
Expand Down Expand Up @@ -130,6 +138,8 @@ public static SlotManagerConfiguration fromConfiguration(

int maxSlotNum = configuration.getInteger(ResourceManagerOptions.MAX_SLOT_NUM);

int redundantTaskManagerNum = configuration.getInteger(ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM);

return new SlotManagerConfiguration(
rpcTimeout,
slotRequestTimeout,
Expand All @@ -138,7 +148,8 @@ public static SlotManagerConfiguration fromConfiguration(
slotMatchingStrategy,
defaultWorkerResourceSpec,
numSlotsPerWorker,
maxSlotNum);
maxSlotNum,
redundantTaskManagerNum);
}

private static Time getSlotRequestTimeout(final Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public class SlotManagerImpl implements SlotManager {
/** Callbacks for resource (de-)allocations. */
private ResourceActions resourceActions;

private ScheduledFuture<?> taskManagerTimeoutCheck;
private ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;

private ScheduledFuture<?> slotRequestTimeoutCheck;

Expand All @@ -127,6 +127,9 @@ public class SlotManagerImpl implements SlotManager {
/** Defines the max limitation of the total number of slots. */
private final int maxSlotNum;

/** Defines the number of redundant taskmanagers. */
private final int redundantTaskManagerNum;

/**
* If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request to pend.
* A slot request is considered unfulfillable if it cannot be fulfilled by neither a slot that is already registered
Expand Down Expand Up @@ -163,6 +166,7 @@ public SlotManagerImpl(
this.defaultSlotResourceProfile = generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);
this.slotManagerMetricGroup = Preconditions.checkNotNull(slotManagerMetricGroup);
this.maxSlotNum = slotManagerConfiguration.getMaxSlotNum();
this.redundantTaskManagerNum = slotManagerConfiguration.getRedundantTaskManagerNum();

slots = new HashMap<>(16);
freeSlots = new LinkedHashMap<>(16);
Expand All @@ -174,7 +178,7 @@ public SlotManagerImpl(
resourceManagerId = null;
resourceActions = null;
mainThreadExecutor = null;
taskManagerTimeoutCheck = null;
taskManagerTimeoutsAndRedundancyCheck = null;
slotRequestTimeoutCheck = null;

started = false;
Expand Down Expand Up @@ -284,9 +288,9 @@ public void start(ResourceManagerId newResourceManagerId, Executor newMainThread

started = true;

taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(
() -> checkTaskManagerTimeouts()),
() -> checkTaskManagerTimeoutsAndRedundancy()),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -318,9 +322,9 @@ public void suspend() {
LOG.info("Suspending the SlotManager.");

// stop the timeout checks for the TaskManagers and the SlotRequests
if (taskManagerTimeoutCheck != null) {
taskManagerTimeoutCheck.cancel(false);
taskManagerTimeoutCheck = null;
if (taskManagerTimeoutsAndRedundancyCheck != null) {
taskManagerTimeoutsAndRedundancyCheck.cancel(false);
taskManagerTimeoutsAndRedundancyCheck = null;
}

if (slotRequestTimeoutCheck != null) {
Expand Down Expand Up @@ -915,6 +919,30 @@ private boolean isMaxSlotNumExceededAfterAdding(int numNewSlot) {
return getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots() + numNewSlot > maxSlotNum;
}

private void allocateRedundantTaskManagers(int number) {
int allocatedNumber = allocateResources(number);
if (number != allocatedNumber) {
LOG.warn("Expect to allocate {} taskManagers. Actually allocate {} taskManagers.", number, allocatedNumber);
}
}

/**
* Allocate a number of workers based on the input param.
* @param workerNum the number of workers to allocate.
* @return the number of allocated workers successfully.
*/
private int allocateResources(int workerNum) {
int allocatedWorkerNum = 0;
for (int i = 0; i < workerNum; ++i) {
if (allocateResource(defaultSlotResourceProfile).isPresent()) {
++allocatedWorkerNum;
} else {
break;
}
}
return allocatedWorkerNum;
}

private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile requestedSlotResourceProfile) {
final int numRegisteredSlots = getNumberRegisteredSlots();
final int numPendingSlots = getNumberPendingTaskManagerSlots();
Expand Down Expand Up @@ -1208,11 +1236,11 @@ public static ResourceProfile generateDefaultSlotResourceProfile(WorkerResourceS
}

// ---------------------------------------------------------------------------------------------
// Internal timeout methods
// Internal periodic check methods
// ---------------------------------------------------------------------------------------------

@VisibleForTesting
void checkTaskManagerTimeouts() {
void checkTaskManagerTimeoutsAndRedundancy() {
if (!taskManagerRegistrations.isEmpty()) {
long currentTime = System.currentTimeMillis();

Expand All @@ -1227,13 +1255,25 @@ void checkTaskManagerTimeouts() {
}
}

// second we trigger the release resource callback which can decide upon the resource release
for (TaskManagerRegistration taskManagerRegistration : timedOutTaskManagers) {
if (waitResultConsumedBeforeRelease) {
releaseTaskExecutorIfPossible(taskManagerRegistration);
} else {
releaseTaskExecutor(taskManagerRegistration.getInstanceId());
}
int slotsDiff = redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size();
if (slotsDiff > 0) {
// Keep enough redundant taskManagers from time to time.
int requiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker);
allocateRedundantTaskManagers(requiredTaskManagers);
} else {
// second we trigger the release resource callback which can decide upon the resource release
int maxReleaseNum = (-slotsDiff) / numSlotsPerWorker;
releaseTaskExecutors(timedOutTaskManagers, Math.min(maxReleaseNum, timedOutTaskManagers.size()));
}
}
}

private void releaseTaskExecutors(ArrayList<TaskManagerRegistration> timedOutTaskManagers, int releaseNum) {
for (int index = 0; index < releaseNum; ++index) {
if (waitResultConsumedBeforeRelease) {
releaseTaskExecutorIfPossible(timedOutTaskManagers.get(index));
} else {
releaseTaskExecutor(timedOutTaskManagers.get(index).getInstanceId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public void confirmLeadership(UUID leaderId, String leaderAddress) {
AnyMatchingSlotMatchingStrategy.INSTANCE,
WorkerResourceSpec.ZERO,
1,
ResourceManagerOptions.MAX_SLOT_NUM.defaultValue()));
ResourceManagerOptions.MAX_SLOT_NUM.defaultValue(),
ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue()));
ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
highAvailabilityServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class SlotManagerBuilder {
private int numSlotsPerWorker;
private SlotManagerMetricGroup slotManagerMetricGroup;
private int maxSlotNum;
private int redundantTaskManagerNum;

private SlotManagerBuilder() {
this.slotMatchingStrategy = AnyMatchingSlotMatchingStrategy.INSTANCE;
Expand All @@ -52,6 +53,7 @@ private SlotManagerBuilder() {
this.numSlotsPerWorker = 1;
this.slotManagerMetricGroup = UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
this.maxSlotNum = ResourceManagerOptions.MAX_SLOT_NUM.defaultValue();
this.redundantTaskManagerNum = ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue();
}

public static SlotManagerBuilder newBuilder() {
Expand Down Expand Up @@ -108,6 +110,11 @@ public SlotManagerBuilder setMaxSlotNum(int maxSlotNum) {
return this;
}

public SlotManagerBuilder setRedundantTaskManagerNum(int redundantTaskManagerNum) {
this.redundantTaskManagerNum = redundantTaskManagerNum;
return this;
}

public SlotManagerImpl build() {
final SlotManagerConfiguration slotManagerConfiguration = new SlotManagerConfiguration(
taskManagerRequestTimeout,
Expand All @@ -117,7 +124,8 @@ public SlotManagerImpl build() {
slotMatchingStrategy,
defaultWorkerResourceSpec,
numSlotsPerWorker,
maxSlotNum);
maxSlotNum,
redundantTaskManagerNum);

return new SlotManagerImpl(
scheduledExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ public void testTimeoutForUnusedTaskManager() throws Exception {

try (final SlotManagerImpl slotManager = createSlotManagerBuilder()
.setTaskManagerTimeout(Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))
.setRedundantTaskManagerNum(0)
.build()) {

slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
Expand Down Expand Up @@ -976,6 +977,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {

try (final SlotManager slotManager = createSlotManagerBuilder()
.setTaskManagerTimeout(taskManagerTimeout)
.setRedundantTaskManagerNum(0)
.buildAndStartWithDirectExec(resourceManagerId, resourceActions)) {

slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport);
Expand Down Expand Up @@ -1299,6 +1301,7 @@ private SlotManagerImpl createSlotManager(ResourceManagerId resourceManagerId, R
private SlotManagerImpl createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions, int numSlotsPerWorker) {
SlotManagerImpl slotManager = createSlotManagerBuilder()
.setNumSlotsPerWorker(numSlotsPerWorker)
.setRedundantTaskManagerNum(0)
.buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions);
return slotManager;
}
Expand Down Expand Up @@ -1595,6 +1598,7 @@ public void testMaxSlotLimitRegisterResource() throws Exception {
try (SlotManagerImpl slotManager = createSlotManagerBuilder()
.setNumSlotsPerWorker(numberSlots)
.setMaxSlotNum(maxSlotNum)
.setRedundantTaskManagerNum(0)
.buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection1, slotReport1);
slotManager.registerTaskManager(taskManagerConnection2, slotReport2);
Expand Down
Loading

0 comments on commit 800ed5c

Please sign in to comment.