Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -552,8 +553,14 @@ private void runApplicationEntryPoint(
.key())));
return;
}
final List<JobID> applicationJobIds =

final List<JobID> submittedJobIds = new ArrayList<>();
final List<JobID> recoveredJobIds =
recoveredJobInfos.stream().map(JobInfo::getJobId).collect(Collectors.toList());
final List<JobID> terminalJobIds =
recoveredTerminalJobInfos.stream()
.map(JobInfo::getJobId)
.collect(Collectors.toList());
try {
if (program == null) {
LOG.info("Reconstructing program from descriptor {}", programDescriptor);
Expand All @@ -562,7 +569,11 @@ private void runApplicationEntryPoint(

final PipelineExecutorServiceLoader executorServiceLoader =
new EmbeddedExecutorServiceLoader(
applicationJobIds, dispatcherGateway, scheduledExecutor);
submittedJobIds,
recoveredJobIds,
terminalJobIds,
dispatcherGateway,
scheduledExecutor);

ClientUtils.executeProgram(
executorServiceLoader,
Expand All @@ -573,12 +584,12 @@ private void runApplicationEntryPoint(
getApplicationId(),
getAllRecoveredJobInfos());

if (applicationJobIds.isEmpty()) {
if (submittedJobIds.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, all jobs, including the newly submitted ones, recovered ones and terminal ones will all be added into this list? If so, the old name applicationJobIds may be more accurate.

And it's better to add some comments for it, like what the list would be like after calling the executeProgram(...).

jobIdsFuture.completeExceptionally(
new ApplicationExecutionException(
"The application contains no execute() calls."));
} else {
jobIdsFuture.complete(applicationJobIds);
jobIdsFuture.complete(submittedJobIds);
}
} catch (Throwable t) {
// If we're running in a single job execution mode, it's safe to consider re-submission
Expand All @@ -591,7 +602,7 @@ private void runApplicationEntryPoint(
final JobID jobId = maybeDuplicate.get().getJobID();
tolerateMissingResult.add(jobId);
jobIdsFuture.complete(Collections.singletonList(jobId));
} else if (submitFailedJobOnApplicationError && applicationJobIds.isEmpty()) {
} else if (submitFailedJobOnApplicationError && submittedJobIds.isEmpty()) {
final JobID failedJobId =
JobID.fromHexString(
configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -71,6 +72,10 @@ public class EmbeddedExecutor implements PipelineExecutor {

private final Collection<JobID> submittedJobIds;

private final Collection<JobID> recoveredJobIds;

private final Collection<JobID> terminalJobIds;

private final DispatcherGateway dispatcherGateway;

private final EmbeddedJobClientCreator jobClientCreator;
Expand All @@ -93,7 +98,25 @@ public EmbeddedExecutor(
final DispatcherGateway dispatcherGateway,
final Configuration configuration,
final EmbeddedJobClientCreator jobClientCreator) {
this(
submittedJobIds,
Collections.emptyList(),
Collections.emptyList(),
dispatcherGateway,
configuration,
jobClientCreator);
}

public EmbeddedExecutor(
final Collection<JobID> submittedJobIds,
final Collection<JobID> recoveredJobIds,
final Collection<JobID> terminalJobIds,
final DispatcherGateway dispatcherGateway,
final Configuration configuration,
final EmbeddedJobClientCreator jobClientCreator) {
this.submittedJobIds = checkNotNull(submittedJobIds);
this.recoveredJobIds = checkNotNull(recoveredJobIds);
this.terminalJobIds = checkNotNull(terminalJobIds);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
this.jobClientCreator = checkNotNull(jobClientCreator);
this.jobStatusChangedListeners =
Expand Down Expand Up @@ -122,18 +145,33 @@ public CompletableFuture<JobClient> execute(

// Skip resubmission if the job is recovered via HA.
// When optJobId is present, the streamGraph's ID is deterministically derived from it. In
// this case, if the streamGraph's ID is in submittedJobIds, it means the job was submitted
// in a previous run and should not be resubmitted.
if (optJobId.isPresent() && submittedJobIds.contains(streamGraph.getJobID())) {
return getJobClientFuture(streamGraph.getJobID(), userCodeClassloader);
// this case, if the streamGraph's ID is in terminalJobIds or submittedJobIds, it means the
// job was submitted in a previous run and should not be resubmitted.
if (optJobId.isPresent()) {
final JobID actualJobId = streamGraph.getJobID();
if (terminalJobIds.contains(actualJobId)) {
LOG.info("Job {} reached a terminal state in a previous execution.", actualJobId);
return getJobClientFuture(actualJobId, userCodeClassloader);
}

if (recoveredJobIds.contains(actualJobId)) {
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
return dispatcherGateway
.recoverJob(actualJobId, timeout)
.thenCompose(
ack -> {
LOG.info("Job {} is recovered successfully.", actualJobId);
return getJobClientFuture(actualJobId, userCodeClassloader);
});
}
}

return submitAndGetJobClientFuture(pipeline, configuration, userCodeClassloader);
}

private CompletableFuture<JobClient> getJobClientFuture(
final JobID jobId, final ClassLoader userCodeClassloader) {
LOG.info("Job {} was recovered successfully.", jobId);
submittedJobIds.add(jobId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methos should be renamed to reflect its new behavior.

return CompletableFuture.completedFuture(
jobClientCreator.getJobClient(jobId, userCodeClassloader));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class EmbeddedExecutorFactory implements PipelineExecutorFactory {

private final Collection<JobID> submittedJobIds;

private final Collection<JobID> recoveredJobIds;

private final Collection<JobID> terminalJobIds;

private final DispatcherGateway dispatcherGateway;

private final ScheduledExecutor retryExecutor;
Expand All @@ -54,9 +58,13 @@ public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
*/
public EmbeddedExecutorFactory(
final Collection<JobID> submittedJobIds,
final Collection<JobID> recoveredJobIds,
final Collection<JobID> terminalJobIds,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor retryExecutor) {
this.submittedJobIds = checkNotNull(submittedJobIds);
this.recoveredJobIds = checkNotNull(recoveredJobIds);
this.terminalJobIds = checkNotNull(terminalJobIds);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
this.retryExecutor = checkNotNull(retryExecutor);
}
Expand All @@ -78,6 +86,8 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
checkNotNull(configuration);
return new EmbeddedExecutor(
submittedJobIds,
recoveredJobIds,
terminalJobIds,
dispatcherGateway,
configuration,
(jobId, userCodeClassloader) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class EmbeddedExecutorServiceLoader implements PipelineExecutorServiceLoa

private final Collection<JobID> submittedJobIds;

private final Collection<JobID> recoveredJobIds;

private final Collection<JobID> terminalJobIds;

private final DispatcherGateway dispatcherGateway;

private final ScheduledExecutor retryExecutor;
Expand All @@ -55,16 +59,21 @@ public class EmbeddedExecutorServiceLoader implements PipelineExecutorServiceLoa
*/
public EmbeddedExecutorServiceLoader(
final Collection<JobID> submittedJobIds,
final Collection<JobID> recoveredJobIds,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments need to be updated. IIUC, unlike submittedJobIds, recoveredJobIds and terminalJobIds are not expected to be modified by the executor.

final Collection<JobID> terminalJobIds,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor retryExecutor) {
this.submittedJobIds = checkNotNull(submittedJobIds);
this.recoveredJobIds = checkNotNull(recoveredJobIds);
this.terminalJobIds = checkNotNull(terminalJobIds);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
this.retryExecutor = checkNotNull(retryExecutor);
}

@Override
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
return new EmbeddedExecutorFactory(submittedJobIds, dispatcherGateway, retryExecutor);
return new EmbeddedExecutorFactory(
submittedJobIds, recoveredJobIds, terminalJobIds, dispatcherGateway, retryExecutor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
Expand Down Expand Up @@ -68,6 +66,7 @@
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNull;

/** Integration tests related to {@link PackagedProgramApplication}. */
class PackagedProgramApplicationITCase {
Expand Down Expand Up @@ -195,10 +194,8 @@ public JobResultStore getJobResultStore() {
awaitClusterStopped(cluster);
}

FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException())
.as(
"The job's main method shouldn't have been succeeded due to a DuplicateJobSubmissionException.")
.hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class);
// submission should succeed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add more comments to explain why it will succeed?

assertNull(ErrorHandlingSubmissionJob.getSubmissionException());

assertThat(jobResultStore.hasDirtyJobResultEntryAsync(jobId).get()).isFalse();
assertThat(jobResultStore.hasCleanJobResultEntryAsync(jobId).get()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ApplicationState;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.JobInfoImpl;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
Expand Down Expand Up @@ -57,6 +59,9 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand All @@ -71,6 +76,7 @@
import java.util.function.Supplier;

import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_STREAMING_JOB_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -1122,6 +1128,107 @@ void testSubmitFailedJobOnApplicationErrorWhenNotEnforceSingleJobExecution() thr
assertApplicationFailed(application);
}

@Test
void testRecoveredTerminalJobsAreNotResubmitted() throws Exception {
final JobID recoveredTerminalJobId = new JobID();

final Configuration configuration = getConfiguration();
configuration.set(
PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
recoveredTerminalJobId.toHexString());

final List<JobID> submittedJobs = new ArrayList<>();
final List<JobID> recoveredJobs = new ArrayList<>();

final TestingDispatcherGateway.Builder dispatcherBuilder =
finishedJobGatewayBuilder()
.setSubmitFunction(
jobGraph -> {
submittedJobs.add(jobGraph.getJobID());
return CompletableFuture.completedFuture(Acknowledge.get());
})
.setRecoverJobFunction(
jobId -> {
recoveredJobs.add(jobId);
return CompletableFuture.completedFuture(Acknowledge.get());
});

// create application with a recovered terminal job
final JobInfo recoveredTerminalJobInfo =
new JobInfoImpl(recoveredTerminalJobId, DEFAULT_STREAMING_JOB_NAME);
final PackagedProgramApplication application =
new PackagedProgramApplication(
new ApplicationID(),
getProgram(1),
Collections.emptyList(),
Collections.singletonList(recoveredTerminalJobInfo),
configuration,
true,
false,
false,
true);

executeApplication(
application, dispatcherBuilder.build(), scheduledExecutor, exception -> {});

application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertApplicationFinished(application);

// verify that the terminal job was not submitted or recovered
assertThat(submittedJobs).isEmpty();
assertThat(recoveredJobs).isEmpty();
}

@Test
void testRecoveredRunningJobsAreRecovered() throws Exception {
final JobID recoveredRunningJobId = new JobID();

final Configuration configuration = getConfiguration();
configuration.set(
PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, recoveredRunningJobId.toHexString());

final List<JobID> submittedJobs = new ArrayList<>();
final List<JobID> recoveredJobs = new ArrayList<>();

final TestingDispatcherGateway.Builder dispatcherBuilder =
finishedJobGatewayBuilder()
.setSubmitFunction(
jobGraph -> {
submittedJobs.add(jobGraph.getJobID());
return CompletableFuture.completedFuture(Acknowledge.get());
})
.setRecoverJobFunction(
jobId -> {
recoveredJobs.add(jobId);
return CompletableFuture.completedFuture(Acknowledge.get());
});

// create application with a recovered running job
final JobInfo recoveredRunningJobInfo =
new JobInfoImpl(recoveredRunningJobId, DEFAULT_STREAMING_JOB_NAME);
final PackagedProgramApplication application =
new PackagedProgramApplication(
new ApplicationID(),
getProgram(1),
Collections.singletonList(recoveredRunningJobInfo),
Collections.emptyList(),
configuration,
true,
false,
false,
true);

executeApplication(
application, dispatcherBuilder.build(), scheduledExecutor, exception -> {});

application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertApplicationFinished(application);

// verify that the running job was recovered, not submitted
assertThat(submittedJobs).isEmpty();
assertThat(recoveredJobs).containsExactly(recoveredRunningJobId);
}

private TestingDispatcherGateway.Builder finishedJobGatewayBuilder() {
return dispatcherGatewayBuilder(JobStatus.FINISHED);
}
Expand Down
Loading