-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-38974][runtime] Handle jobs in running applications during HA recovery #27686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,7 @@ | |
| import java.net.InetSocketAddress; | ||
| import java.time.Duration; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
@@ -71,6 +72,10 @@ public class EmbeddedExecutor implements PipelineExecutor { | |
|
|
||
| private final Collection<JobID> submittedJobIds; | ||
|
|
||
| private final Collection<JobID> recoveredJobIds; | ||
|
|
||
| private final Collection<JobID> terminalJobIds; | ||
|
|
||
| private final DispatcherGateway dispatcherGateway; | ||
|
|
||
| private final EmbeddedJobClientCreator jobClientCreator; | ||
|
|
@@ -93,7 +98,25 @@ public EmbeddedExecutor( | |
| final DispatcherGateway dispatcherGateway, | ||
| final Configuration configuration, | ||
| final EmbeddedJobClientCreator jobClientCreator) { | ||
| this( | ||
| submittedJobIds, | ||
| Collections.emptyList(), | ||
| Collections.emptyList(), | ||
| dispatcherGateway, | ||
| configuration, | ||
| jobClientCreator); | ||
| } | ||
|
|
||
| public EmbeddedExecutor( | ||
| final Collection<JobID> submittedJobIds, | ||
| final Collection<JobID> recoveredJobIds, | ||
| final Collection<JobID> terminalJobIds, | ||
| final DispatcherGateway dispatcherGateway, | ||
| final Configuration configuration, | ||
| final EmbeddedJobClientCreator jobClientCreator) { | ||
| this.submittedJobIds = checkNotNull(submittedJobIds); | ||
| this.recoveredJobIds = checkNotNull(recoveredJobIds); | ||
| this.terminalJobIds = checkNotNull(terminalJobIds); | ||
| this.dispatcherGateway = checkNotNull(dispatcherGateway); | ||
| this.jobClientCreator = checkNotNull(jobClientCreator); | ||
| this.jobStatusChangedListeners = | ||
|
|
@@ -122,18 +145,33 @@ public CompletableFuture<JobClient> execute( | |
|
|
||
| // Skip resubmission if the job is recovered via HA. | ||
| // When optJobId is present, the streamGraph's ID is deterministically derived from it. In | ||
| // this case, if the streamGraph's ID is in submittedJobIds, it means the job was submitted | ||
| // in a previous run and should not be resubmitted. | ||
| if (optJobId.isPresent() && submittedJobIds.contains(streamGraph.getJobID())) { | ||
| return getJobClientFuture(streamGraph.getJobID(), userCodeClassloader); | ||
| // this case, if the streamGraph's ID is in terminalJobIds or submittedJobIds, it means the | ||
| // job was submitted in a previous run and should not be resubmitted. | ||
| if (optJobId.isPresent()) { | ||
| final JobID actualJobId = streamGraph.getJobID(); | ||
| if (terminalJobIds.contains(actualJobId)) { | ||
| LOG.info("Job {} reached a terminal state in a previous execution.", actualJobId); | ||
| return getJobClientFuture(actualJobId, userCodeClassloader); | ||
| } | ||
|
|
||
| if (recoveredJobIds.contains(actualJobId)) { | ||
| final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT); | ||
| return dispatcherGateway | ||
| .recoverJob(actualJobId, timeout) | ||
| .thenCompose( | ||
| ack -> { | ||
| LOG.info("Job {} is recovered successfully.", actualJobId); | ||
| return getJobClientFuture(actualJobId, userCodeClassloader); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| return submitAndGetJobClientFuture(pipeline, configuration, userCodeClassloader); | ||
| } | ||
|
|
||
| private CompletableFuture<JobClient> getJobClientFuture( | ||
| final JobID jobId, final ClassLoader userCodeClassloader) { | ||
| LOG.info("Job {} was recovered successfully.", jobId); | ||
| submittedJobIds.add(jobId); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The methos should be renamed to reflect its new behavior. |
||
| return CompletableFuture.completedFuture( | ||
| jobClientCreator.getJobClient(jobId, userCodeClassloader)); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,10 @@ public class EmbeddedExecutorServiceLoader implements PipelineExecutorServiceLoa | |
|
|
||
| private final Collection<JobID> submittedJobIds; | ||
|
|
||
| private final Collection<JobID> recoveredJobIds; | ||
|
|
||
| private final Collection<JobID> terminalJobIds; | ||
|
|
||
| private final DispatcherGateway dispatcherGateway; | ||
|
|
||
| private final ScheduledExecutor retryExecutor; | ||
|
|
@@ -55,16 +59,21 @@ public class EmbeddedExecutorServiceLoader implements PipelineExecutorServiceLoa | |
| */ | ||
| public EmbeddedExecutorServiceLoader( | ||
| final Collection<JobID> submittedJobIds, | ||
| final Collection<JobID> recoveredJobIds, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comments need to be updated. IIUC, unlike |
||
| final Collection<JobID> terminalJobIds, | ||
| final DispatcherGateway dispatcherGateway, | ||
| final ScheduledExecutor retryExecutor) { | ||
| this.submittedJobIds = checkNotNull(submittedJobIds); | ||
| this.recoveredJobIds = checkNotNull(recoveredJobIds); | ||
| this.terminalJobIds = checkNotNull(terminalJobIds); | ||
| this.dispatcherGateway = checkNotNull(dispatcherGateway); | ||
| this.retryExecutor = checkNotNull(retryExecutor); | ||
| } | ||
|
|
||
| @Override | ||
| public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) { | ||
| return new EmbeddedExecutorFactory(submittedJobIds, dispatcherGateway, retryExecutor); | ||
| return new EmbeddedExecutorFactory( | ||
| submittedJobIds, recoveredJobIds, terminalJobIds, dispatcherGateway, retryExecutor); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,8 +31,6 @@ | |
| import org.apache.flink.configuration.DeploymentOptions; | ||
| import org.apache.flink.configuration.HighAvailabilityOptions; | ||
| import org.apache.flink.configuration.PipelineOptionsInternal; | ||
| import org.apache.flink.core.testutils.FlinkAssertions; | ||
| import org.apache.flink.runtime.client.DuplicateJobSubmissionException; | ||
| import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; | ||
| import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; | ||
| import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; | ||
|
|
@@ -68,6 +66,7 @@ | |
| import java.util.function.Supplier; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.junit.jupiter.api.Assertions.assertNull; | ||
|
|
||
| /** Integration tests related to {@link PackagedProgramApplication}. */ | ||
| class PackagedProgramApplicationITCase { | ||
|
|
@@ -195,10 +194,8 @@ public JobResultStore getJobResultStore() { | |
| awaitClusterStopped(cluster); | ||
| } | ||
|
|
||
| FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException()) | ||
| .as( | ||
| "The job's main method shouldn't have been succeeded due to a DuplicateJobSubmissionException.") | ||
| .hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class); | ||
| // submission should succeed | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add more comments to explain why it will succeed? |
||
| assertNull(ErrorHandlingSubmissionJob.getSubmissionException()); | ||
|
|
||
| assertThat(jobResultStore.hasDirtyJobResultEntryAsync(jobId).get()).isFalse(); | ||
| assertThat(jobResultStore.hasCleanJobResultEntryAsync(jobId).get()).isTrue(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, all jobs, including the newly submitted ones, recovered ones and terminal ones will all be added into this list? If so, the old name
applicationJobIdsmay be more accurate.And it's better to add some comments for it, like what the list would be like after calling the
executeProgram(...).