Skip to content

Commit

Permalink
[FLINK-28402][runtime] Create FailureHandlingResultSnapshot with the …
Browse files Browse the repository at this point in the history
…truly failed execution

This closes apache#20221.
  • Loading branch information
zhuzhurk committed Jul 11, 2022
1 parent 777d780 commit edd10a1
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;

import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
Expand Down Expand Up @@ -72,18 +73,18 @@ public ExecutionFailureHandler(
* Return result of failure handling. Can be a set of task vertices to restart and a delay of
* the restarting. Or that the failure is not recoverable and the reason for it.
*
* @param failedTask is the ID of the failed task vertex
* @param failedExecution is the failed execution
* @param cause of the task failure
* @param timestamp of the task failure
* @return result of the failure handling
*/
public FailureHandlingResult getFailureHandlingResult(
ExecutionVertexID failedTask, Throwable cause, long timestamp) {
Execution failedExecution, Throwable cause, long timestamp) {
return handleFailure(
failedTask,
failedExecution,
cause,
timestamp,
failoverStrategy.getTasksNeedingRestart(failedTask, cause),
failoverStrategy.getTasksNeedingRestart(failedExecution.getVertex().getID(), cause),
false);
}

Expand All @@ -109,15 +110,15 @@ public FailureHandlingResult getGlobalFailureHandlingResult(
}

private FailureHandlingResult handleFailure(
@Nullable final ExecutionVertexID failingExecutionVertexId,
@Nullable final Execution failedExecution,
final Throwable cause,
long timestamp,
final Set<ExecutionVertexID> verticesToRestart,
final boolean globalFailure) {

if (isUnrecoverableError(cause)) {
return FailureHandlingResult.unrecoverable(
failingExecutionVertexId,
failedExecution,
new JobException("The failure is not recoverable", cause),
timestamp,
globalFailure);
Expand All @@ -128,15 +129,15 @@ private FailureHandlingResult handleFailure(
numberOfRestarts++;

return FailureHandlingResult.restartable(
failingExecutionVertexId,
failedExecution,
cause,
timestamp,
verticesToRestart,
restartBackoffTimeStrategy.getBackoffTime(),
globalFailure);
} else {
return FailureHandlingResult.unrecoverable(
failingExecutionVertexId,
failedExecution,
new JobException(
"Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.flink.runtime.executiongraph.failover.flip1;

import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -47,10 +47,10 @@ public class FailureHandlingResult {
private final long restartDelayMS;

/**
* The {@link ExecutionVertexID} refering to the {@link ExecutionVertex} the failure is
* originating from or {@code null} if it's a global failure.
* The {@link Execution} that the failure is originating from or {@code null} if it's a global
* failure.
*/
@Nullable private final ExecutionVertexID failingExecutionVertexId;
@Nullable private final Execution failedExecution;

/** Failure reason. {@code @Nullable} because of FLINK-21376. */
@Nullable private final Throwable error;
Expand All @@ -64,17 +64,16 @@ public class FailureHandlingResult {
/**
* Creates a result of a set of tasks to restart to recover from the failure.
*
* @param failingExecutionVertexId the {@link ExecutionVertexID} referring to the {@link
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param failedExecution the {@link Execution} that the failure is originating from. Passing
* {@code null} as a value indicates that the failure was issued by Flink itself.
* @param cause the exception that caused this failure.
* @param timestamp the time the failure was handled.
* @param verticesToRestart containing task vertices to restart to recover from the failure.
* {@code null} indicates that the failure is not restartable.
* @param restartDelayMS indicate a delay before conducting the restart
*/
private FailureHandlingResult(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nullable Execution failedExecution,
@Nullable Throwable cause,
long timestamp,
@Nullable Set<ExecutionVertexID> verticesToRestart,
Expand All @@ -84,7 +83,7 @@ private FailureHandlingResult(

this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart));
this.restartDelayMS = restartDelayMS;
this.failingExecutionVertexId = failingExecutionVertexId;
this.failedExecution = failedExecution;
this.error = cause;
this.timestamp = timestamp;
this.globalFailure = globalFailure;
Expand All @@ -93,20 +92,19 @@ private FailureHandlingResult(
/**
* Creates a result that the failure is not recoverable and no restarting should be conducted.
*
* @param failingExecutionVertexId the {@link ExecutionVertexID} referring to the {@link
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param failedExecution the {@link Execution} that the failure is originating from. Passing
* {@code null} as a value indicates that the failure was issued by Flink itself.
* @param error reason why the failure is not recoverable
* @param timestamp the time the failure was handled.
*/
private FailureHandlingResult(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nullable Execution failedExecution,
@Nonnull Throwable error,
long timestamp,
boolean globalFailure) {
this.verticesToRestart = null;
this.restartDelayMS = -1;
this.failingExecutionVertexId = failingExecutionVertexId;
this.failedExecution = failedExecution;
this.error = checkNotNull(error);
this.timestamp = timestamp;
this.globalFailure = globalFailure;
Expand Down Expand Up @@ -141,14 +139,14 @@ public long getRestartDelayMS() {
}

/**
* Returns an {@code Optional} with the {@link ExecutionVertexID} of the task causing this
* failure or an empty {@code Optional} if it's a global failure.
* Returns an {@code Optional} with the {@link Execution} causing this failure or an empty
* {@code Optional} if it's a global failure.
*
* @return The {@code ExecutionVertexID} of the causing task or an empty {@code Optional} if
* it's a global failure.
* @return The {@code Optional} with the failed {@code Execution} or an empty {@code Optional}
* if it's a global failure.
*/
public Optional<ExecutionVertexID> getExecutionVertexIdOfFailedTask() {
return Optional.ofNullable(failingExecutionVertexId);
public Optional<Execution> getFailedExecution() {
return Optional.ofNullable(failedExecution);
}

/**
Expand Down Expand Up @@ -193,9 +191,8 @@ public boolean isGlobalFailure() {
* <p>The result can be flagged to be from a global failure triggered by the scheduler, rather
* than from the failure of an individual task.
*
* @param failingExecutionVertexId the {@link ExecutionVertexID} refering to the {@link
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param failedExecution the {@link Execution} that the failure is originating from. Passing
* {@code null} as a value indicates that the failure was issued by Flink itself.
* @param cause The reason of the failure.
* @param timestamp The time of the failure.
* @param verticesToRestart containing task vertices to restart to recover from the failure.
Expand All @@ -204,14 +201,14 @@ public boolean isGlobalFailure() {
* @return result of a set of tasks to restart to recover from the failure
*/
public static FailureHandlingResult restartable(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nullable Execution failedExecution,
@Nullable Throwable cause,
long timestamp,
@Nullable Set<ExecutionVertexID> verticesToRestart,
long restartDelayMS,
boolean globalFailure) {
return new FailureHandlingResult(
failingExecutionVertexId,
failedExecution,
cause,
timestamp,
verticesToRestart,
Expand All @@ -225,18 +222,17 @@ public static FailureHandlingResult restartable(
* <p>The result can be flagged to be from a global failure triggered by the scheduler, rather
* than from the failure of an individual task.
*
* @param failingExecutionVertexId the {@link ExecutionVertexID} refering to the {@link
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param failedExecution the {@link Execution} that the failure is originating from. Passing
* {@code null} as a value indicates that the failure was issued by Flink itself.
* @param error reason why the failure is not recoverable
* @param timestamp The time of the failure.
* @return result indicating the failure is not recoverable
*/
public static FailureHandlingResult unrecoverable(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nullable Execution failedExecution,
@Nonnull Throwable error,
long timestamp,
boolean globalFailure) {
return new FailureHandlingResult(failingExecutionVertexId, error, timestamp, globalFailure);
return new FailureHandlingResult(failedExecution, error, timestamp, globalFailure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,29 +236,30 @@ protected void updateTaskExecutionStateInternal(

schedulingStrategy.onExecutionStateChange(
executionVertexId, taskExecutionState.getExecutionState());
maybeHandleTaskFailure(taskExecutionState, executionVertexId);
maybeHandleTaskFailure(taskExecutionState, getCurrentExecutionOfVertex(executionVertexId));
}

private void maybeHandleTaskFailure(
final TaskExecutionStateTransition taskExecutionState,
final ExecutionVertexID executionVertexId) {
final TaskExecutionStateTransition taskExecutionState, final Execution execution) {

if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {
final Throwable error = taskExecutionState.getError(userCodeLoader);
handleTaskFailure(executionVertexId, error);
handleTaskFailure(execution, error);
}
}

private void handleTaskFailure(
final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
final Execution failedExecution, @Nullable final Throwable error) {
Throwable revisedError =
maybeTranslateToCachedIntermediateDataSetException(error, executionVertexId);
maybeTranslateToCachedIntermediateDataSetException(
error, failedExecution.getVertex().getID());
final long timestamp = System.currentTimeMillis();
setGlobalFailureCause(revisedError, timestamp);
notifyCoordinatorsAboutTaskFailure(executionVertexId, revisedError);
notifyCoordinatorsAboutTaskFailure(failedExecution.getVertex().getID(), revisedError);

final FailureHandlingResult failureHandlingResult =
executionFailureHandler.getFailureHandlingResult(
executionVertexId, revisedError, timestamp);
failedExecution, revisedError, timestamp);
maybeRestartTasks(failureHandlingResult);
}

Expand Down Expand Up @@ -329,8 +330,7 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe

final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
FailureHandlingResultSnapshot.create(
failureHandlingResult,
id -> this.getExecutionVertex(id).getCurrentExecutionAttempt());
failureHandlingResult, id -> getExecutionVertex(id).getCurrentExecutions());
delayExecutor.schedule(
() ->
FutureUtils.assertNoException(
Expand Down Expand Up @@ -401,7 +401,11 @@ private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID execu
}

private ExecutionAttemptID getCurrentExecutionIdOfVertex(ExecutionVertexID executionVertexId) {
return getExecutionVertex(executionVertexId).getCurrentExecutionAttempt().getAttemptId();
return getCurrentExecutionOfVertex(executionVertexId).getAttemptId();
}

private Execution getCurrentExecutionOfVertex(ExecutionVertexID executionVertexId) {
return getExecutionVertex(executionVertexId).getCurrentExecutionAttempt();
}

// ------------------------------------------------------------------------
Expand All @@ -415,8 +419,7 @@ public void allocateSlotsAndDeploy(final List<ExecutionVertexID> verticesToDeplo

final List<Execution> executionsToDeploy =
verticesToDeploy.stream()
.map(this::getExecutionVertex)
.map(ExecutionVertex::getCurrentExecutionAttempt)
.map(this::getCurrentExecutionOfVertex)
.collect(Collectors.toList());

executionDeployer.allocateSlotsAndDeploy(executionsToDeploy, requiredVersionByVertex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -52,38 +52,27 @@ public class FailureHandlingResultSnapshot {
*
* @param failureHandlingResult The {@code FailureHandlingResult} that is used for extracting
* the failure information.
* @param latestExecutionLookup The look-up function for retrieving the latest {@link Execution}
* instance for a given {@link ExecutionVertexID}.
* @param currentExecutionsLookup The look-up function for retrieving all the current {@link
* Execution} instances for a given {@link ExecutionVertexID}.
* @return The {@code FailureHandlingResultSnapshot}.
*/
public static FailureHandlingResultSnapshot create(
FailureHandlingResult failureHandlingResult,
Function<ExecutionVertexID, Execution> latestExecutionLookup) {
Function<ExecutionVertexID, Collection<Execution>> currentExecutionsLookup) {
final Execution rootCauseExecution =
failureHandlingResult
.getExecutionVertexIdOfFailedTask()
.map(latestExecutionLookup)
.orElse(null);
Preconditions.checkArgument(
rootCauseExecution == null || rootCauseExecution.getFailureInfo().isPresent(),
String.format(
"The execution %s didn't provide a failure info even though the corresponding ExecutionVertex %s is marked as having handled the root cause of this failure.",
// the "(null)" values should never be used due to the condition - it's just
// added to make the compiler happy
rootCauseExecution != null ? rootCauseExecution.getAttemptId() : "(null)",
failureHandlingResult
.getExecutionVertexIdOfFailedTask()
.map(Objects::toString)
.orElse("(null)")));

final ExecutionVertexID rootCauseExecutionVertexId =
failureHandlingResult.getExecutionVertexIdOfFailedTask().orElse(null);
failureHandlingResult.getFailedExecution().orElse(null);

if (rootCauseExecution != null && !rootCauseExecution.getFailureInfo().isPresent()) {
throw new IllegalArgumentException(
String.format(
"The failed execution %s didn't provide a failure info.",
rootCauseExecution.getAttemptId()));
}

final Set<Execution> concurrentlyFailedExecutions =
failureHandlingResult.getVerticesToRestart().stream()
.filter(
executionVertexId ->
!executionVertexId.equals(rootCauseExecutionVertexId))
.map(latestExecutionLookup)
.flatMap(id -> currentExecutionsLookup.apply(id).stream())
.filter(execution -> execution != rootCauseExecution)
.filter(execution -> execution.getFailureInfo().isPresent())
.collect(Collectors.toSet());

Expand Down
Loading

0 comments on commit edd10a1

Please sign in to comment.