From 642d28ec83c915ddc29037737d89f6698f99161e Mon Sep 17 00:00:00 2001 From: ifndef-SleePy Date: Sun, 29 Jan 2023 20:05:00 +0800 Subject: [PATCH] [FLINK-30798][runtime] Refactor FinalizeOnMaster#finalizeGlobal to expose finished task attempts --- .../flink/api/common/io/FinalizeOnMaster.java | 36 ++++++++++++++++++- .../executiongraph/DefaultExecutionGraph.java | 34 ++++++++++++++++-- .../jobgraph/InputOutputFormatVertex.java | 17 +++++++-- .../flink/runtime/jobgraph/JobVertex.java | 25 ++++++++++++- .../executiongraph/FinalizeOnMasterTest.java | 7 ++-- .../runtime/jobgraph/JobTaskVertexTest.java | 19 ++++++++-- .../minicluster/MiniClusterITCase.java | 4 +-- pom.xml | 2 ++ 8 files changed, 131 insertions(+), 13 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java index d7287183629c4..fb89d0d400783 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java @@ -35,6 +35,40 @@ public interface FinalizeOnMaster { * * @param parallelism The parallelism with which the format or functions was run. * @throws IOException The finalization may throw exceptions, which may cause the job to abort. + * @deprecated Use {@link #finalizeGlobal(FinalizationContext)} instead. */ - void finalizeGlobal(int parallelism) throws IOException; + @Deprecated + default void finalizeGlobal(int parallelism) throws IOException {} + + /** + * The method is invoked on the master (JobManager) after all (parallel) instances of an + * OutputFormat finished. + * + * @param context The context to get finalization infos. + * @throws IOException The finalization may throw exceptions, which may cause the job to abort. + */ + default void finalizeGlobal(FinalizationContext context) throws IOException { + finalizeGlobal(context.getParallelism()); + } + + /** A context that provides parallelism and finished attempts infos. */ + @Public + interface FinalizationContext { + + /** + * Get the parallelism with which the format or functions was run. + * + * @return the parallelism. + */ + int getParallelism(); + + /** + * Get the finished attempt number of subtask. + * + * @param subtaskIndex the subtask index. + * @return the finished attempt. + * @throws IllegalArgumentException Thrown, if subtaskIndex is invalid. + */ + int getFinishedAttempt(int subtaskIndex); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 494711d35da04..dd3a109235330 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -57,6 +57,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertex.FinalizeOnMasterContext; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -1227,10 +1228,39 @@ private void jobFinished() { try { for (ExecutionJobVertex ejv : verticesInCreationOrder) { + final Map subtaskToFinishedAttempt = + Arrays.stream(ejv.getTaskVertices()) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .collect( + Collectors.toMap( + Execution::getParallelSubtaskIndex, + Execution::getAttemptNumber)); ejv.getJobVertex() .finalizeOnMaster( - new SimpleInitializeOnMasterContext( - getUserClassLoader(), ejv.getParallelism())); + new FinalizeOnMasterContext() { + @Override + public ClassLoader getClassLoader() { + return getUserClassLoader(); + } + + @Override + public int getExecutionParallelism() { + return ejv.getParallelism(); + } + + @Override + public int getFinishedAttempt(int subtaskIndex) { + final Integer attemptNumber = + subtaskToFinishedAttempt.get(subtaskIndex); + if (attemptNumber == null) { + throw new IllegalArgumentException( + "Invalid subtaskIndex " + + subtaskIndex + + " provided"); + } + return attemptNumber; + } + }); } } catch (Throwable t) { ExceptionUtils.rethrowIfFatalError(t); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java index 9eb3e36cef70e..4e098e1c72da7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobgraph; import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.FinalizeOnMaster.FinalizationContext; import org.apache.flink.api.common.io.InitializeOnMaster; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.OutputFormat; @@ -119,7 +120,7 @@ public void initializeOnMaster(InitializeOnMasterContext context) throws Excepti } @Override - public void finalizeOnMaster(InitializeOnMasterContext context) throws Exception { + public void finalizeOnMaster(FinalizeOnMasterContext context) throws Exception { final ClassLoader loader = context.getClassLoader(); final InputOutputFormatContainer formatContainer = initInputOutputformatContainer(loader); @@ -149,7 +150,19 @@ public void finalizeOnMaster(InitializeOnMasterContext context) throws Exception if (outputFormat instanceof FinalizeOnMaster) { int executionParallelism = context.getExecutionParallelism(); - ((FinalizeOnMaster) outputFormat).finalizeGlobal(executionParallelism); + ((FinalizeOnMaster) outputFormat) + .finalizeGlobal( + new FinalizationContext() { + @Override + public int getParallelism() { + return executionParallelism; + } + + @Override + public int getFinishedAttempt(int subtaskIndex) { + return context.getFinishedAttempt(subtaskIndex); + } + }); } } } finally { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 442512bdd2d73..a90c1248e469a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -564,7 +564,7 @@ public void initializeOnMaster(InitializeOnMasterContext context) throws Excepti * @param context Provides contextual information for the initialization * @throws Exception The method may throw exceptions which cause the job to fail immediately. */ - public void finalizeOnMaster(InitializeOnMasterContext context) throws Exception {} + public void finalizeOnMaster(FinalizeOnMasterContext context) throws Exception {} public interface InitializeOnMasterContext { /** The class loader for user defined code. */ @@ -579,6 +579,29 @@ public interface InitializeOnMasterContext { int getExecutionParallelism(); } + /** The context exposes some runtime infos for finalization. */ + public interface FinalizeOnMasterContext { + /** The class loader for user defined code. */ + ClassLoader getClassLoader(); + + /** + * The actual parallelism this vertex will be run with. In contrast, the {@link + * #getParallelism()} is the original parallelism set when creating the {@link JobGraph} and + * might be updated e.g. by the {@link + * org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}. + */ + int getExecutionParallelism(); + + /** + * Get the finished attempt number of subtask. + * + * @param subtaskIndex the subtask index. + * @return the finished attempt. + * @throws IllegalArgumentException Thrown, if subtaskIndex is invalid. + */ + int getFinishedAttempt(int subtaskIndex); + } + // -------------------------------------------------------------------------------------------- public String getOperatorName() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java index a302ce2e86265..b15dfe6e07e25 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertex.FinalizeOnMasterContext; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.testutils.TestingUtils; @@ -77,8 +78,8 @@ public void testFinalizeIsCalledUponSuccess() throws Exception { ExecutionGraphTestUtils.finishAllVertices(eg); assertEquals(JobStatus.FINISHED, eg.waitUntilTerminal()); - verify(vertex1, times(1)).finalizeOnMaster(any(JobVertex.InitializeOnMasterContext.class)); - verify(vertex2, times(1)).finalizeOnMaster(any(JobVertex.InitializeOnMasterContext.class)); + verify(vertex1, times(1)).finalizeOnMaster(any(FinalizeOnMasterContext.class)); + verify(vertex2, times(1)).finalizeOnMaster(any(FinalizeOnMasterContext.class)); assertEquals(0, eg.getRegisteredExecutions().size()); } @@ -109,7 +110,7 @@ public void testFinalizeIsNotCalledUponFailure() throws Exception { assertEquals(JobStatus.FAILED, eg.waitUntilTerminal()); - verify(vertex, times(0)).finalizeOnMaster(any(JobVertex.InitializeOnMasterContext.class)); + verify(vertex, times(0)).finalizeOnMaster(any(FinalizeOnMasterContext.class)); assertEquals(0, eg.getRegisteredExecutions().size()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java index d31fdb9d8bfb4..13223ceb9daf9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java @@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.executiongraph.SimpleInitializeOnMasterContext; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobVertex.FinalizeOnMasterContext; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.testutils.junit.SharedObjectsExtension; import org.apache.flink.testutils.junit.SharedReference; @@ -148,8 +149,22 @@ void testOutputFormat() throws Exception { assertThatThrownBy( () -> copy.finalizeOnMaster( - new SimpleInitializeOnMasterContext( - cl, copy.getParallelism()))) + new FinalizeOnMasterContext() { + @Override + public ClassLoader getClassLoader() { + return cl; + } + + @Override + public int getExecutionParallelism() { + return copy.getParallelism(); + } + + @Override + public int getFinishedAttempt(int subtaskIndex) { + return 0; + } + })) .isInstanceOf(TestException.class); assertThat(Thread.currentThread().getContextClassLoader()) .as("Previous classloader was not restored.") diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index ccf600fea09c6..24463d25d8ff9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -714,7 +714,7 @@ private static class WaitOnFinalizeJobVertex extends JobVertex { } @Override - public void finalizeOnMaster(InitializeOnMasterContext context) throws Exception { + public void finalizeOnMaster(FinalizeOnMasterContext context) throws Exception { Thread.sleep(waitingTime); finalizedOnMaster.set(true); } @@ -731,7 +731,7 @@ private OutOfMemoryInFinalizationJobVertex() { } @Override - public void finalizeOnMaster(InitializeOnMasterContext context) { + public void finalizeOnMaster(FinalizeOnMasterContext context) { throw new OutOfMemoryError("Java heap space"); } } diff --git a/pom.xml b/pom.xml index 4cca0c3570b87..bbe056fed8312 100644 --- a/pom.xml +++ b/pom.xml @@ -2187,6 +2187,8 @@ under the License. org.apache.flink.api.connector.source.SourceReaderContext#currentParallelism() org.apache.flink.api.common.io.OutputFormat#open(int,int) org.apache.flink.api.common.io.OutputFormat#open(org.apache.flink.api.common.io.OutputFormat$InitializationContext) + org.apache.flink.api.common.io.FinalizeOnMaster#finalizeGlobal(int) + org.apache.flink.api.common.io.FinalizeOnMaster#finalizeGlobal(org.apache.flink.api.common.io.FinalizeOnMaster$FinalizationContext) public