Skip to content

Commit a518b9d

Browse files
committed
[FLINK-38441] Run ExecutionGraph operations from main thread
1 parent aac79a3 commit a518b9d

File tree

1 file changed

+28
-13
lines changed

1 file changed

+28
-13
lines changed

flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.flink.runtime.executiongraph;
2020

2121
import org.apache.flink.api.common.JobStatus;
22-
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
2322
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
2423
import org.apache.flink.runtime.execution.ExecutionState;
2524
import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
@@ -35,7 +34,6 @@
3534
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
3635
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
3736
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
38-
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
3937
import org.apache.flink.testutils.TestingUtils;
4038
import org.apache.flink.testutils.executor.TestExecutorExtension;
4139
import org.apache.flink.util.FlinkException;
@@ -58,6 +56,13 @@ class ExecutionGraphCoLocationRestartTest {
5856
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
5957
TestingUtils.defaultExecutorExtension();
6058

59+
@RegisterExtension
60+
static final TestingComponentMainThreadExecutor.Extension MAIN_EXECUTOR_RESOURCE =
61+
new TestingComponentMainThreadExecutor.Extension();
62+
63+
private final TestingComponentMainThreadExecutor mainThreadExecutor =
64+
MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
65+
6166
private static final int NUM_TASKS = 31;
6267

6368
@Test
@@ -83,11 +88,10 @@ void testConstraintsAfterRestart() throws Exception {
8388

8489
final ManuallyTriggeredScheduledExecutorService delayExecutor =
8590
new ManuallyTriggeredScheduledExecutorService();
86-
final DirectScheduledExecutorService futureExecutor = new DirectScheduledExecutorService();
8791
final SchedulerBase scheduler =
8892
new DefaultSchedulerBuilder(
8993
jobGraph,
90-
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
94+
mainThreadExecutor.getMainThreadExecutor(),
9195
EXECUTOR_RESOURCE.getExecutor())
9296
.setExecutionSlotAllocatorFactory(
9397
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
@@ -97,7 +101,6 @@ void testConstraintsAfterRestart() throws Exception {
97101
TestingPhysicalSlot.builder()
98102
.build()))))
99103
.setDelayExecutor(delayExecutor)
100-
.setFutureExecutor(futureExecutor)
101104
.setRestartBackoffTimeStrategy(
102105
new FixedDelayRestartBackoffTimeStrategy
103106
.FixedDelayRestartBackoffTimeStrategyFactory(1, 0)
@@ -109,7 +112,7 @@ void testConstraintsAfterRestart() throws Exception {
109112
// enable the queued scheduling for the slot pool
110113
assertThat(eg.getState()).isEqualTo(JobStatus.CREATED);
111114

112-
scheduler.startScheduling();
115+
mainThreadExecutor.execute(scheduler::startScheduling);
113116

114117
Predicate<AccessExecution> isDeploying =
115118
ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
@@ -120,19 +123,28 @@ void testConstraintsAfterRestart() throws Exception {
120123
// sanity checks
121124
validateConstraints(eg);
122125

123-
eg.getAllExecutionVertices().iterator().next().fail(new FlinkException("Test exception"));
126+
mainThreadExecutor.execute(
127+
() -> {
128+
eg.getAllExecutionVertices()
129+
.iterator()
130+
.next()
131+
.fail(new FlinkException("Test exception"));
132+
});
124133

125134
assertThat(eg.getState()).isEqualTo(JobStatus.RESTARTING);
126135

127136
// trigger registration of restartTasks(...) callback to cancelFuture before completing the
128137
// cancellation. This ensures the restarting actions to be performed in main thread.
129138
delayExecutor.triggerNonPeriodicScheduledTask();
130139

131-
for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
132-
if (vertex.getExecutionState() == ExecutionState.CANCELING) {
133-
vertex.getCurrentExecutionAttempt().completeCancelling();
134-
}
135-
}
140+
mainThreadExecutor.execute(
141+
() -> {
142+
for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
143+
if (vertex.getExecutionState() == ExecutionState.CANCELING) {
144+
vertex.getCurrentExecutionAttempt().completeCancelling();
145+
}
146+
}
147+
});
136148

137149
// wait until we have restarted
138150
ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING, timeout);
@@ -142,7 +154,10 @@ void testConstraintsAfterRestart() throws Exception {
142154
// checking execution vertex properties
143155
validateConstraints(eg);
144156

145-
ExecutionGraphTestUtils.finishAllVertices(eg);
157+
mainThreadExecutor.execute(
158+
() -> {
159+
ExecutionGraphTestUtils.finishAllVertices(eg);
160+
});
146161

147162
assertThat(eg.getState()).isEqualTo(FINISHED);
148163
}

0 commit comments

Comments
 (0)