Skip to content

Commit

Permalink
[FLINK-30798][runtime] Refactor FinalizeOnMaster#finalizeGlobal to ex…
Browse files Browse the repository at this point in the history
…pose finished task attempts
  • Loading branch information
ifndef-SleePy authored and zhuzhurk committed Jan 31, 2023
1 parent 74ef6be commit 642d28e
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,40 @@ public interface FinalizeOnMaster {
*
* @param parallelism The parallelism with which the format or functions was run.
* @throws IOException The finalization may throw exceptions, which may cause the job to abort.
* @deprecated Use {@link #finalizeGlobal(FinalizationContext)} instead.
*/
void finalizeGlobal(int parallelism) throws IOException;
@Deprecated
default void finalizeGlobal(int parallelism) throws IOException {}

/**
* The method is invoked on the master (JobManager) after all (parallel) instances of an
* OutputFormat finished.
*
* @param context The context to get finalization infos.
* @throws IOException The finalization may throw exceptions, which may cause the job to abort.
*/
default void finalizeGlobal(FinalizationContext context) throws IOException {
finalizeGlobal(context.getParallelism());
}

/** A context that provides parallelism and finished attempts infos. */
@Public
interface FinalizationContext {

/**
* Get the parallelism with which the format or functions was run.
*
* @return the parallelism.
*/
int getParallelism();

/**
* Get the finished attempt number of subtask.
*
* @param subtaskIndex the subtask index.
* @return the finished attempt.
* @throws IllegalArgumentException Thrown, if subtaskIndex is invalid.
*/
int getFinishedAttempt(int subtaskIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertex.FinalizeOnMasterContext;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
Expand Down Expand Up @@ -1227,10 +1228,39 @@ private void jobFinished() {

try {
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
final Map<Integer, Integer> subtaskToFinishedAttempt =
Arrays.stream(ejv.getTaskVertices())
.map(ExecutionVertex::getCurrentExecutionAttempt)
.collect(
Collectors.toMap(
Execution::getParallelSubtaskIndex,
Execution::getAttemptNumber));
ejv.getJobVertex()
.finalizeOnMaster(
new SimpleInitializeOnMasterContext(
getUserClassLoader(), ejv.getParallelism()));
new FinalizeOnMasterContext() {
@Override
public ClassLoader getClassLoader() {
return getUserClassLoader();
}

@Override
public int getExecutionParallelism() {
return ejv.getParallelism();
}

@Override
public int getFinishedAttempt(int subtaskIndex) {
final Integer attemptNumber =
subtaskToFinishedAttempt.get(subtaskIndex);
if (attemptNumber == null) {
throw new IllegalArgumentException(
"Invalid subtaskIndex "
+ subtaskIndex
+ " provided");
}
return attemptNumber;
}
});
}
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobgraph;

import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.FinalizeOnMaster.FinalizationContext;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
Expand Down Expand Up @@ -119,7 +120,7 @@ public void initializeOnMaster(InitializeOnMasterContext context) throws Excepti
}

@Override
public void finalizeOnMaster(InitializeOnMasterContext context) throws Exception {
public void finalizeOnMaster(FinalizeOnMasterContext context) throws Exception {
final ClassLoader loader = context.getClassLoader();
final InputOutputFormatContainer formatContainer = initInputOutputformatContainer(loader);

Expand Down Expand Up @@ -149,7 +150,19 @@ public void finalizeOnMaster(InitializeOnMasterContext context) throws Exception

if (outputFormat instanceof FinalizeOnMaster) {
int executionParallelism = context.getExecutionParallelism();
((FinalizeOnMaster) outputFormat).finalizeGlobal(executionParallelism);
((FinalizeOnMaster) outputFormat)
.finalizeGlobal(
new FinalizationContext() {
@Override
public int getParallelism() {
return executionParallelism;
}

@Override
public int getFinishedAttempt(int subtaskIndex) {
return context.getFinishedAttempt(subtaskIndex);
}
});
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ public void initializeOnMaster(InitializeOnMasterContext context) throws Excepti
* @param context Provides contextual information for the initialization
* @throws Exception The method may throw exceptions which cause the job to fail immediately.
*/
public void finalizeOnMaster(InitializeOnMasterContext context) throws Exception {}
public void finalizeOnMaster(FinalizeOnMasterContext context) throws Exception {}

public interface InitializeOnMasterContext {
/** The class loader for user defined code. */
Expand All @@ -579,6 +579,29 @@ public interface InitializeOnMasterContext {
int getExecutionParallelism();
}

/** The context exposes some runtime infos for finalization. */
public interface FinalizeOnMasterContext {
/** The class loader for user defined code. */
ClassLoader getClassLoader();

/**
* The actual parallelism this vertex will be run with. In contrast, the {@link
* #getParallelism()} is the original parallelism set when creating the {@link JobGraph} and
* might be updated e.g. by the {@link
* org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}.
*/
int getExecutionParallelism();

/**
* Get the finished attempt number of subtask.
*
* @param subtaskIndex the subtask index.
* @return the finished attempt.
* @throws IllegalArgumentException Thrown, if subtaskIndex is invalid.
*/
int getFinishedAttempt(int subtaskIndex);
}

// --------------------------------------------------------------------------------------------

public String getOperatorName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertex.FinalizeOnMasterContext;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
Expand Down Expand Up @@ -77,8 +78,8 @@ public void testFinalizeIsCalledUponSuccess() throws Exception {
ExecutionGraphTestUtils.finishAllVertices(eg);
assertEquals(JobStatus.FINISHED, eg.waitUntilTerminal());

verify(vertex1, times(1)).finalizeOnMaster(any(JobVertex.InitializeOnMasterContext.class));
verify(vertex2, times(1)).finalizeOnMaster(any(JobVertex.InitializeOnMasterContext.class));
verify(vertex1, times(1)).finalizeOnMaster(any(FinalizeOnMasterContext.class));
verify(vertex2, times(1)).finalizeOnMaster(any(FinalizeOnMasterContext.class));

assertEquals(0, eg.getRegisteredExecutions().size());
}
Expand Down Expand Up @@ -109,7 +110,7 @@ public void testFinalizeIsNotCalledUponFailure() throws Exception {

assertEquals(JobStatus.FAILED, eg.waitUntilTerminal());

verify(vertex, times(0)).finalizeOnMaster(any(JobVertex.InitializeOnMasterContext.class));
verify(vertex, times(0)).finalizeOnMaster(any(FinalizeOnMasterContext.class));

assertEquals(0, eg.getRegisteredExecutions().size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.executiongraph.SimpleInitializeOnMasterContext;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobVertex.FinalizeOnMasterContext;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
Expand Down Expand Up @@ -148,8 +149,22 @@ void testOutputFormat() throws Exception {
assertThatThrownBy(
() ->
copy.finalizeOnMaster(
new SimpleInitializeOnMasterContext(
cl, copy.getParallelism())))
new FinalizeOnMasterContext() {
@Override
public ClassLoader getClassLoader() {
return cl;
}

@Override
public int getExecutionParallelism() {
return copy.getParallelism();
}

@Override
public int getFinishedAttempt(int subtaskIndex) {
return 0;
}
}))
.isInstanceOf(TestException.class);
assertThat(Thread.currentThread().getContextClassLoader())
.as("Previous classloader was not restored.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ private static class WaitOnFinalizeJobVertex extends JobVertex {
}

@Override
public void finalizeOnMaster(InitializeOnMasterContext context) throws Exception {
public void finalizeOnMaster(FinalizeOnMasterContext context) throws Exception {
Thread.sleep(waitingTime);
finalizedOnMaster.set(true);
}
Expand All @@ -731,7 +731,7 @@ private OutOfMemoryInFinalizationJobVertex() {
}

@Override
public void finalizeOnMaster(InitializeOnMasterContext context) {
public void finalizeOnMaster(FinalizeOnMasterContext context) {
throw new OutOfMemoryError("Java heap space");
}
}
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2187,6 +2187,8 @@ under the License.
<exclude>org.apache.flink.api.connector.source.SourceReaderContext#currentParallelism()</exclude>
<exclude>org.apache.flink.api.common.io.OutputFormat#open(int,int)</exclude>
<exclude>org.apache.flink.api.common.io.OutputFormat#open(org.apache.flink.api.common.io.OutputFormat$InitializationContext)</exclude>
<exclude>org.apache.flink.api.common.io.FinalizeOnMaster#finalizeGlobal(int)</exclude>
<exclude>org.apache.flink.api.common.io.FinalizeOnMaster#finalizeGlobal(org.apache.flink.api.common.io.FinalizeOnMaster$FinalizationContext)</exclude>
<!-- MARKER: end exclusions -->
</excludes>
<accessModifier>public</accessModifier>
Expand Down

0 comments on commit 642d28e

Please sign in to comment.