Skip to content

Commit 1f34a07

Browse files
noorallzhuzhurk
authored andcommitted
[FLINK-38272][runtime] Fix unstable BatchJobRecoveryTest
1 parent 3478ddf commit 1f34a07

File tree

4 files changed

+142
-57
lines changed

4 files changed

+142
-57
lines changed

flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.core.testutils.AllCallbackWrapper;
2323
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
2424
import org.apache.flink.runtime.clusterframework.types.AllocationID;
25-
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2625
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2726
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
2827
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -55,7 +54,6 @@
5554
import java.time.Duration;
5655
import java.util.Collection;
5756
import java.util.Collections;
58-
import java.util.Map;
5957
import java.util.UUID;
6058
import java.util.concurrent.CompletableFuture;
6159
import java.util.concurrent.ExecutionException;
@@ -296,53 +294,4 @@ private void registerTaskExecutorAndOfferSlots(
296294
.offerSlots(taskManagerLocation.getResourceID(), slotOffers, testingTimeout)
297295
.get();
298296
}
299-
300-
private static class TestingExecutionDeploymentTrackerWrapper
301-
implements ExecutionDeploymentTracker {
302-
private final ExecutionDeploymentTracker originalTracker;
303-
private final CompletableFuture<ExecutionAttemptID> taskDeploymentFuture;
304-
private final CompletableFuture<ExecutionAttemptID> stopFuture;
305-
306-
private TestingExecutionDeploymentTrackerWrapper() {
307-
this(new DefaultExecutionDeploymentTracker());
308-
}
309-
310-
private TestingExecutionDeploymentTrackerWrapper(
311-
ExecutionDeploymentTracker originalTracker) {
312-
this.originalTracker = originalTracker;
313-
this.taskDeploymentFuture = new CompletableFuture<>();
314-
this.stopFuture = new CompletableFuture<>();
315-
}
316-
317-
@Override
318-
public void startTrackingPendingDeploymentOf(
319-
ExecutionAttemptID executionAttemptId, ResourceID host) {
320-
originalTracker.startTrackingPendingDeploymentOf(executionAttemptId, host);
321-
}
322-
323-
@Override
324-
public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) {
325-
originalTracker.completeDeploymentOf(executionAttemptId);
326-
taskDeploymentFuture.complete(executionAttemptId);
327-
}
328-
329-
@Override
330-
public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) {
331-
originalTracker.stopTrackingDeploymentOf(executionAttemptId);
332-
stopFuture.complete(executionAttemptId);
333-
}
334-
335-
@Override
336-
public Map<ExecutionAttemptID, ExecutionDeploymentState> getExecutionsOn(ResourceID host) {
337-
return originalTracker.getExecutionsOn(host);
338-
}
339-
340-
public CompletableFuture<ExecutionAttemptID> getTaskDeploymentFuture() {
341-
return taskDeploymentFuture;
342-
}
343-
344-
public CompletableFuture<ExecutionAttemptID> getStopFuture() {
345-
return stopFuture;
346-
}
347-
}
348297
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.jobmaster;
20+
21+
import org.apache.flink.runtime.clusterframework.types.ResourceID;
22+
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
23+
24+
import java.util.Collections;
25+
import java.util.HashSet;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
29+
30+
/** Testing implementation of the {@link ExecutionDeploymentTracker}. */
31+
public class TestingExecutionDeploymentTrackerWrapper implements ExecutionDeploymentTracker {
32+
private final ExecutionDeploymentTracker originalTracker;
33+
private final CompletableFuture<ExecutionAttemptID> taskDeploymentFuture;
34+
private final CompletableFuture<ExecutionAttemptID> stopFuture;
35+
private final Set<ExecutionAttemptID> deployedExecutions = new HashSet<>();
36+
37+
public TestingExecutionDeploymentTrackerWrapper() {
38+
this(new DefaultExecutionDeploymentTracker());
39+
}
40+
41+
public TestingExecutionDeploymentTrackerWrapper(ExecutionDeploymentTracker originalTracker) {
42+
this.originalTracker = originalTracker;
43+
this.taskDeploymentFuture = new CompletableFuture<>();
44+
this.stopFuture = new CompletableFuture<>();
45+
}
46+
47+
@Override
48+
public void startTrackingPendingDeploymentOf(
49+
ExecutionAttemptID executionAttemptId, ResourceID host) {
50+
originalTracker.startTrackingPendingDeploymentOf(executionAttemptId, host);
51+
}
52+
53+
@Override
54+
public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) {
55+
originalTracker.completeDeploymentOf(executionAttemptId);
56+
taskDeploymentFuture.complete(executionAttemptId);
57+
deployedExecutions.add(executionAttemptId);
58+
}
59+
60+
@Override
61+
public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) {
62+
originalTracker.stopTrackingDeploymentOf(executionAttemptId);
63+
stopFuture.complete(executionAttemptId);
64+
}
65+
66+
@Override
67+
public Map<ExecutionAttemptID, ExecutionDeploymentState> getExecutionsOn(ResourceID host) {
68+
return originalTracker.getExecutionsOn(host);
69+
}
70+
71+
public CompletableFuture<ExecutionAttemptID> getTaskDeploymentFuture() {
72+
return taskDeploymentFuture;
73+
}
74+
75+
public CompletableFuture<ExecutionAttemptID> getStopFuture() {
76+
return stopFuture;
77+
}
78+
79+
public Set<ExecutionAttemptID> getDeployedExecutions() {
80+
return Collections.unmodifiableSet(deployedExecutions);
81+
}
82+
}

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.flink.runtime.jobgraph.JobGraph;
4444
import org.apache.flink.runtime.jobgraph.JobVertexID;
4545
import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
46+
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
4647
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
4748
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
4849
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
@@ -122,6 +123,8 @@ public class DefaultSchedulerBuilder {
122123
private InputConsumableDecider.Factory inputConsumableDeciderFactory =
123124
AllFinishedInputConsumableDecider.Factory.INSTANCE;
124125
private BatchJobRecoveryHandler jobRecoveryHandler = new DummyBatchJobRecoveryHandler();
126+
private ExecutionDeploymentTracker executionDeploymentTracker =
127+
new DefaultExecutionDeploymentTracker();
125128

126129
public DefaultSchedulerBuilder(
127130
JobGraph jobGraph,
@@ -301,6 +304,12 @@ public DefaultSchedulerBuilder setJobRecoveryHandler(
301304
return this;
302305
}
303306

307+
public DefaultSchedulerBuilder setExecutionDeploymentTracker(
308+
ExecutionDeploymentTracker executionDeploymentTracker) {
309+
this.executionDeploymentTracker = executionDeploymentTracker;
310+
return this;
311+
}
312+
304313
public DefaultScheduler build() throws Exception {
305314
return new DefaultScheduler(
306315
log,
@@ -367,7 +376,7 @@ public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler(boolean enableSpecu
367376
jobManagerJobMetricGroup,
368377
shuffleMaster,
369378
partitionTracker,
370-
new DefaultExecutionDeploymentTracker(),
379+
executionDeploymentTracker,
371380
System.currentTimeMillis(),
372381
mainThreadExecutor,
373382
jobStatusListener,
@@ -390,7 +399,7 @@ private ExecutionGraphFactory createExecutionGraphFactory(
390399
return new DefaultExecutionGraphFactory(
391400
jobMasterConfiguration,
392401
userCodeLoader,
393-
new DefaultExecutionDeploymentTracker(),
402+
executionDeploymentTracker,
394403
futureExecutor,
395404
ioExecutor,
396405
rpcTimeout,

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.flink.runtime.jobgraph.JobVertex;
5555
import org.apache.flink.runtime.jobgraph.JobVertexID;
5656
import org.apache.flink.runtime.jobgraph.OperatorID;
57+
import org.apache.flink.runtime.jobmaster.TestingExecutionDeploymentTrackerWrapper;
5758
import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent;
5859
import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore;
5960
import org.apache.flink.runtime.jobmaster.event.JobEvent;
@@ -159,6 +160,9 @@ public class BatchJobRecoveryTest {
159160
private ScheduledExecutor delayedExecutor =
160161
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
161162

163+
private TestingExecutionDeploymentTrackerWrapper executionDeploymentTracker =
164+
new TestingExecutionDeploymentTrackerWrapper();
165+
162166
private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
163167
private static final int NUM_SPLITS = 10;
164168
private static final int SOURCE_PARALLELISM = 5;
@@ -216,6 +220,7 @@ void setUp() throws IOException {
216220

217221
this.serializedJobGraph = serializeJobGraph(createDefaultJobGraph());
218222
allPartitionWithMetrics.clear();
223+
executionDeploymentTracker = new TestingExecutionDeploymentTrackerWrapper();
219224
}
220225

221226
@AfterEach
@@ -238,11 +243,14 @@ void testRecoverFromJMFailover() throws Exception {
238243

239244
runInMainThread(scheduler::startScheduling);
240245

246+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
241247
runInMainThread(
242248
() -> {
243249
// transition all sources to finished.
244250
transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID);
245251
});
252+
253+
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
246254
runInMainThread(
247255
() -> {
248256
// transition all middle tasks to RUNNING state
@@ -338,11 +346,14 @@ void testJobVertexUnFinishedAndOperatorCoordinatorNotSupportBatchSnapshot() thro
338346

339347
runInMainThread(scheduler::startScheduling);
340348

349+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
341350
runInMainThread(
342351
() -> {
343352
// transition all sources to finished.
344353
transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID);
345354
});
355+
356+
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
346357
runInMainThread(
347358
() -> {
348359
// transition first middle task to finished.
@@ -451,6 +462,7 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit
451462

452463
runInMainThread(scheduler::startScheduling);
453464

465+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
454466
runInMainThread(
455467
() -> {
456468
// transition all sources to finished.
@@ -495,14 +507,13 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit
495507
}
496508
}
497509

498-
for (ExecutionVertex taskVertex :
499-
getExecutionVertices(MIDDLE_ID, newScheduler.getExecutionGraph())) {
500-
waitUntilExecutionVertexState(taskVertex, ExecutionState.DEPLOYING, 15000L);
501-
}
510+
waitUntilAllExecutionsDeployed(MIDDLE_ID, newScheduler);
502511

512+
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
503513
runInMainThread(
504514
() -> {
505515
// transition all middle tasks to running
516+
transitionExecutionsState(scheduler, ExecutionState.INITIALIZING, MIDDLE_ID);
506517
transitionExecutionsState(scheduler, ExecutionState.RUNNING, MIDDLE_ID);
507518
});
508519

@@ -539,6 +550,7 @@ void testRecoverFromJMFailoverAndPartitionsUnavailable() throws Exception {
539550

540551
runInMainThread(scheduler::startScheduling);
541552

553+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
542554
runInMainThread(
543555
() -> {
544556
// transition all sources to finished.
@@ -596,15 +608,20 @@ void testRecoverDecidedParallelismFromTheSameJobGraphInstance() throws Exception
596608

597609
runInMainThread(scheduler::startScheduling);
598610

611+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
599612
runInMainThread(
600613
() -> {
601614
// transition all sources to finished.
602615
transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID);
603616
});
617+
618+
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
604619
runInMainThread(
605620
() -> { // transition all middle tasks to finished.
606621
transitionExecutionsState(scheduler, ExecutionState.FINISHED, MIDDLE_ID);
607622
});
623+
624+
waitUntilAllExecutionsDeployed(SINK_ID, scheduler);
608625
runInMainThread(
609626
() -> {
610627
// transition all sinks to finished.
@@ -676,6 +693,7 @@ void testPartitionNotFoundTwiceAfterJMFailover() throws Exception {
676693
});
677694

678695
// transition all sources to finished.
696+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
679697
runInMainThread(
680698
() -> transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID));
681699

@@ -1124,6 +1142,7 @@ private AdaptiveBatchScheduler createScheduler(
11241142
jobGraph,
11251143
mainThreadExecutor.getMainThreadExecutor(),
11261144
EXECUTOR_RESOURCE.getExecutor())
1145+
.setExecutionDeploymentTracker(executionDeploymentTracker)
11271146
.setRestartBackoffTimeStrategy(
11281147
new FixedDelayRestartBackoffTimeStrategy
11291148
.FixedDelayRestartBackoffTimeStrategyFactory(10, 0)
@@ -1212,4 +1231,30 @@ public Optional<ResourceID> storesLocalResourcesOn() {
12121231
};
12131232
}
12141233
}
1234+
1235+
private void waitUntilAllExecutionsDeployed(
1236+
JobVertexID vertexId, AdaptiveBatchScheduler scheduler) throws Exception {
1237+
AtomicBoolean isAllExecutionDeployed = new AtomicBoolean(false);
1238+
1239+
while (!isAllExecutionDeployed.get()) {
1240+
runInMainThread(
1241+
() -> {
1242+
List<ExecutionAttemptID> attemptIds =
1243+
Arrays.stream(
1244+
scheduler
1245+
.getExecutionJobVertex(vertexId)
1246+
.getTaskVertices())
1247+
.map(ExecutionVertex::getCurrentExecutionAttempt)
1248+
.map(Execution::getAttemptId)
1249+
.collect(Collectors.toList());
1250+
if (!attemptIds.isEmpty()
1251+
&& executionDeploymentTracker
1252+
.getDeployedExecutions()
1253+
.containsAll(attemptIds)) {
1254+
isAllExecutionDeployed.set(true);
1255+
}
1256+
});
1257+
Thread.sleep(2);
1258+
}
1259+
}
12151260
}

0 commit comments

Comments
 (0)