Skip to content

Commit

Permalink
[FLINK-35550][runtime] Adds new component RescaleManager that handles…
Browse files Browse the repository at this point in the history
… the rescaling logic to improve code testability and extensibility

Rescaling is a state-specific functionality. Moving all the logic into Executing state allows
us to align the resource controlling in Executing state and WaitingForResources state in a future effort.
  • Loading branch information
XComp committed Jun 27, 2024
1 parent 7fc3aac commit 7f13995
Show file tree
Hide file tree
Showing 10 changed files with 971 additions and 351 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.MutableVertexAttemptNumberStore;
Expand Down Expand Up @@ -110,9 +109,6 @@
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceMinimalIncreaseRescalingController;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceParallelismChangeRescalingController;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.RescalingController;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics;
Expand Down Expand Up @@ -147,8 +143,8 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.configuration.JobManagerOptions.MIN_PARALLELISM_INCREASE;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;

/**
Expand Down Expand Up @@ -228,7 +224,8 @@ public static Settings of(Configuration configuration) {
.orElse(stabilizationTimeoutDefault),
configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT),
scalingIntervalMin,
scalingIntervalMax);
scalingIntervalMax,
configuration.get(MIN_PARALLELISM_INCREASE));
}

private final SchedulerExecutionMode executionMode;
Expand All @@ -237,20 +234,23 @@ public static Settings of(Configuration configuration) {
private final Duration slotIdleTimeout;
private final Duration scalingIntervalMin;
private final Duration scalingIntervalMax;
private final int minParallelismChangeForDesiredRescale;

private Settings(
SchedulerExecutionMode executionMode,
Duration initialResourceAllocationTimeout,
Duration resourceStabilizationTimeout,
Duration slotIdleTimeout,
Duration scalingIntervalMin,
Duration scalingIntervalMax) {
Duration scalingIntervalMax,
int minParallelismChangeForDesiredRescale) {
this.executionMode = executionMode;
this.initialResourceAllocationTimeout = initialResourceAllocationTimeout;
this.resourceStabilizationTimeout = resourceStabilizationTimeout;
this.slotIdleTimeout = slotIdleTimeout;
this.scalingIntervalMin = scalingIntervalMin;
this.scalingIntervalMax = scalingIntervalMax;
this.minParallelismChangeForDesiredRescale = minParallelismChangeForDesiredRescale;
}

public SchedulerExecutionMode getExecutionMode() {
Expand All @@ -276,9 +276,15 @@ public Duration getScalingIntervalMin() {
public Duration getScalingIntervalMax() {
return scalingIntervalMax;
}

public int getMinParallelismChangeForDesiredRescale() {
return minParallelismChangeForDesiredRescale;
}
}

private final Settings settings;
private final RescaleManager.Factory rescaleManagerFactory;

private final JobGraph jobGraph;

private final JobInfo jobInfo;
Expand Down Expand Up @@ -308,10 +314,6 @@ public Duration getScalingIntervalMax() {

private final SlotAllocator slotAllocator;

private final RescalingController rescalingController;

private final RescalingController forceRescalingController;

private final ExecutionGraphFactory executionGraphFactory;

private State state = new Created(this, LOG);
Expand Down Expand Up @@ -360,6 +362,8 @@ public AdaptiveScheduler(
assertPreconditions(jobGraph);

this.settings = settings;
this.rescaleManagerFactory = DefaultRescaleManager.Factory.fromSettings(settings);

this.jobGraph = jobGraph;
this.jobInfo = new JobInfoImpl(jobGraph.getJobID(), jobGraph.getName());

Expand Down Expand Up @@ -395,10 +399,6 @@ public AdaptiveScheduler(

this.componentMainThreadExecutor = mainThreadExecutor;

this.rescalingController = new EnforceMinimalIncreaseRescalingController(configuration);

this.forceRescalingController = new EnforceParallelismChangeRescalingController();

this.executionGraphFactory = executionGraphFactory;

final JobStatusStore jobStatusStore = new JobStatusStore(initializationTimestamp);
Expand Down Expand Up @@ -1048,8 +1048,8 @@ public void goToExecuting(
this,
userCodeClassLoader,
failureCollection,
settings.getScalingIntervalMin(),
settings.getScalingIntervalMax()));
rescaleManagerFactory,
settings.getMinParallelismChangeForDesiredRescale()));
}

@Override
Expand Down Expand Up @@ -1275,34 +1275,10 @@ private ExecutionGraph createExecutionGraphAndRestoreState(
LOG);
}

/**
* In regular mode, rescale the job if added resource meets {@link
* JobManagerOptions#MIN_PARALLELISM_INCREASE}. In force mode rescale if the parallelism has
* changed.
*/
@Override
public boolean shouldRescale(ExecutionGraph executionGraph, boolean forceRescale) {
final Optional<VertexParallelism> maybeNewParallelism =
slotAllocator.determineParallelism(
jobInformation, declarativeSlotPool.getAllSlotsInformation());
return maybeNewParallelism
.filter(
vertexParallelism -> {
RescalingController rescalingControllerToUse =
forceRescale ? forceRescalingController : rescalingController;
return rescalingControllerToUse.shouldRescale(
getCurrentParallelism(executionGraph), vertexParallelism);
})
.isPresent();
}

private static VertexParallelism getCurrentParallelism(ExecutionGraph executionGraph) {
return new VertexParallelism(
executionGraph.getAllVertices().values().stream()
.collect(
Collectors.toMap(
ExecutionJobVertex::getJobVertexId,
ExecutionJobVertex::getParallelism)));
public Optional<VertexParallelism> getAvailableVertexParallelism() {
return slotAllocator.determineParallelism(
jobInformation, declarativeSlotPool.getAllSlotsInformation());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.Temporal;
import java.util.function.Supplier;

/**
* {@code DefaultRescaleManager} manages triggering the next rescaling based on when the previous
* rescale operation happened and the available resources. It handles the event based on the
* following phases (in that order):
*
* <ol>
* <li>Cooldown phase: No rescaling takes place (its upper threshold is defined by {@code
* scalingIntervalMin}.
* <li>Soft-rescaling phase: Rescaling is triggered if the desired amount of resources is
* available.
* <li>Hard-rescaling phase: Rescaling is triggered if a sufficient amount of resources is
* available (its lower threshold is defined by (@code scalingIntervalMax}).
* </ol>
*
* @see Executing
*/
public class DefaultRescaleManager implements RescaleManager {

private static final Logger LOG = LoggerFactory.getLogger(DefaultRescaleManager.class);

private final Temporal initializationTime;
private final Supplier<Temporal> clock;

@VisibleForTesting final Duration scalingIntervalMin;
@VisibleForTesting @Nullable final Duration scalingIntervalMax;

private final RescaleManager.Context rescaleContext;

private boolean rescaleScheduled = false;

DefaultRescaleManager(
Temporal initializationTime,
RescaleManager.Context rescaleContext,
Duration scalingIntervalMin,
@Nullable Duration scalingIntervalMax) {
this(
initializationTime,
Instant::now,
rescaleContext,
scalingIntervalMin,
scalingIntervalMax);
}

@VisibleForTesting
DefaultRescaleManager(
Temporal initializationTime,
Supplier<Temporal> clock,
RescaleManager.Context rescaleContext,
Duration scalingIntervalMin,
@Nullable Duration scalingIntervalMax) {
this.initializationTime = initializationTime;
this.clock = clock;

Preconditions.checkArgument(
scalingIntervalMax == null || scalingIntervalMin.compareTo(scalingIntervalMax) <= 0,
"scalingIntervalMax should at least match or be longer than scalingIntervalMin.");
this.scalingIntervalMin = scalingIntervalMin;
this.scalingIntervalMax = scalingIntervalMax;

this.rescaleContext = rescaleContext;
}

@Override
public void onChange() {
if (timeSinceLastRescale().compareTo(scalingIntervalMin) > 0) {
maybeRescale();
} else if (!rescaleScheduled) {
rescaleScheduled = true;
rescaleContext.scheduleOperation(this::maybeRescale, scalingIntervalMin);
}
}

private Duration timeSinceLastRescale() {
return Duration.between(this.initializationTime, clock.get());
}

private void maybeRescale() {
rescaleScheduled = false;
if (rescaleContext.hasDesiredResources()) {
LOG.info("Desired parallelism for job was reached: Rescaling will be triggered.");
rescaleContext.rescale();
} else if (scalingIntervalMax != null) {
LOG.info(
"The longer the pipeline runs, the more the (small) resource gain is worth the restarting time. "
+ "Last resource added does not meet the configured minimal parallelism change. Forced rescaling will be triggered after {} if the resource is still there.",
scalingIntervalMax);

// reasoning for inconsistent scheduling:
// https://lists.apache.org/thread/m2w2xzfjpxlw63j0k7tfxfgs0rshhwwr
if (timeSinceLastRescale().compareTo(scalingIntervalMax) > 0) {
rescaleWithSufficientResources();
} else {
rescaleContext.scheduleOperation(
this::rescaleWithSufficientResources, scalingIntervalMax);
}
}
}

private void rescaleWithSufficientResources() {
if (rescaleContext.hasSufficientResources()) {
LOG.info(
"Resources for desired job parallelism couldn't be collected after {}: Rescaling will be enforced.",
scalingIntervalMax);
rescaleContext.rescale();
}
}

public static class Factory implements RescaleManager.Factory {

private final Duration scalingIntervalMin;
@Nullable private final Duration scalingIntervalMax;

/**
* Creates a {@code Factory} instance based on the {@link AdaptiveScheduler}'s {@code
* Settings} for rescaling.
*/
public static Factory fromSettings(AdaptiveScheduler.Settings settings) {
// it's not ideal that we use a AdaptiveScheduler internal class here. We might want to
// change that as part of a more general alignment of the rescaling configuration.
return new Factory(settings.getScalingIntervalMin(), settings.getScalingIntervalMax());
}

private Factory(Duration scalingIntervalMin, @Nullable Duration scalingIntervalMax) {
this.scalingIntervalMin = scalingIntervalMin;
this.scalingIntervalMax = scalingIntervalMax;
}

@Override
public DefaultRescaleManager create(Context rescaleContext, Instant lastRescale) {
return new DefaultRescaleManager(
lastRescale, rescaleContext, scalingIntervalMin, scalingIntervalMax);
}
}
}
Loading

0 comments on commit 7f13995

Please sign in to comment.