Skip to content

Commit c19e969

Browse files
committed
address comments
1 parent 3ac2278 commit c19e969

File tree

2 files changed

+44
-39
lines changed

2 files changed

+44
-39
lines changed

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import org.apache.flink.api.common.ExecutionConfig;
2222
import org.apache.flink.api.common.JobExecutionResult;
23+
import org.apache.flink.api.common.JobStatus;
2324
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
25+
import org.apache.flink.api.common.time.Deadline;
2426
import org.apache.flink.configuration.Configuration;
2527
import org.apache.flink.configuration.JobManagerOptions;
2628
import org.apache.flink.runtime.execution.Environment;
@@ -33,6 +35,7 @@
3335
import org.apache.flink.runtime.jobmaster.JobResult;
3436
import org.apache.flink.runtime.minicluster.MiniCluster;
3537
import org.apache.flink.runtime.testtasks.NoOpInvokable;
38+
import org.apache.flink.runtime.testutils.CommonTestUtils;
3639
import org.apache.flink.runtime.testutils.MiniClusterResource;
3740
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
3841
import org.apache.flink.util.FlinkRuntimeException;
@@ -43,6 +46,7 @@
4346

4447
import java.io.IOException;
4548
import java.time.Duration;
49+
import java.time.temporal.ChronoUnit;
4650

4751
import static org.junit.Assert.assertTrue;
4852

@@ -104,6 +108,34 @@ private JobGraph createJobGraph() {
104108
return JobGraphTestUtils.streamingJobGraph(source, sink);
105109
}
106110

111+
@Test
112+
public void testJobCancellationWhileRestartingSucceeds() throws Exception {
113+
final long timeInRestartingState = 10000L;
114+
115+
final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
116+
final JobVertex alwaysFailingOperator = new JobVertex("Always failing operator");
117+
alwaysFailingOperator.setInvokableClass(AlwaysFailingInvokable.class);
118+
alwaysFailingOperator.setParallelism(1);
119+
120+
final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(alwaysFailingOperator);
121+
ExecutionConfig executionConfig = new ExecutionConfig();
122+
// configure a high delay between attempts: We'll stay in RESTARTING for 10 seconds.
123+
executionConfig.setRestartStrategy(
124+
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, timeInRestartingState));
125+
jobGraph.setExecutionConfig(executionConfig);
126+
127+
miniCluster.submitJob(jobGraph).join();
128+
129+
// wait until we are in RESTARTING state
130+
CommonTestUtils.waitUntilCondition(
131+
() -> miniCluster.getJobStatus(jobGraph.getJobID()).get() == JobStatus.RESTARTING,
132+
Deadline.fromNow(Duration.of(timeInRestartingState, ChronoUnit.MILLIS)),
133+
5);
134+
135+
// now cancel while in RESTARTING state
136+
miniCluster.cancelJob(jobGraph.getJobID()).get();
137+
}
138+
107139
@Test
108140
public void testGlobalFailoverIfTaskFails() throws Throwable {
109141
final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
@@ -160,4 +192,15 @@ private static void reset() {
160192
hasFailed = false;
161193
}
162194
}
195+
196+
public static final class AlwaysFailingInvokable extends AbstractInvokable {
197+
public AlwaysFailingInvokable(Environment environment) {
198+
super(environment);
199+
}
200+
201+
@Override
202+
public void invoke() throws Exception {
203+
throw new FlinkRuntimeException("Test failure.");
204+
}
205+
}
163206
}

flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.flink.configuration.Configuration;
2929
import org.apache.flink.configuration.JobManagerOptions;
3030
import org.apache.flink.core.execution.JobClient;
31-
import org.apache.flink.core.testutils.OneShotLatch;
3231
import org.apache.flink.runtime.state.FunctionInitializationContext;
3332
import org.apache.flink.runtime.state.FunctionSnapshotContext;
3433
import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -39,7 +38,6 @@
3938
import org.apache.flink.streaming.api.datastream.DataStreamSource;
4039
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4140
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
42-
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
4341
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
4442
import org.apache.flink.test.util.MiniClusterWithClientResource;
4543
import org.apache.flink.util.FlinkException;
@@ -111,6 +109,7 @@ public void cancelRunningJobs() {
111109
/** Tests that the adaptive scheduler can recover stateful operators. */
112110
@Test
113111
public void testGlobalFailoverCanRecoverState() throws Exception {
112+
114113
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
115114
env.setParallelism(PARALLELISM);
116115

@@ -232,43 +231,6 @@ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Ex
232231
assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath()));
233232
}
234233

235-
@Test
236-
public void testCancellationOfJobInRestartLoop() throws Exception {
237-
final long timeInRestartingState = 10000L;
238-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
239-
240-
// configure a high delay between attempts: We'll stay in RESTARTING for 10 seconds.
241-
env.setRestartStrategy(
242-
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, timeInRestartingState));
243-
env.addSource(new NotifyOnRunningAndFailingSource()).addSink(new DiscardingSink<>());
244-
245-
JobClient client = env.executeAsync();
246-
247-
NotifyOnRunningAndFailingSource.runningLatch.await();
248-
249-
// wait until we are in RESTARTING state
250-
CommonTestUtils.waitUntilCondition(
251-
() -> client.getJobStatus().get() == JobStatus.RESTARTING,
252-
Deadline.fromNow(Duration.of(timeInRestartingState, ChronoUnit.MILLIS)),
253-
5);
254-
255-
// now cancel while in RESTARTING state
256-
client.cancel().get();
257-
}
258-
259-
private static class NotifyOnRunningAndFailingSource implements ParallelSourceFunction<String> {
260-
private static final OneShotLatch runningLatch = new OneShotLatch();
261-
262-
@Override
263-
public void run(SourceContext<String> ctx) throws Exception {
264-
runningLatch.trigger();
265-
throw new RuntimeException();
266-
}
267-
268-
@Override
269-
public void cancel() {}
270-
}
271-
272234
private boolean isDirectoryEmpty(File directory) {
273235
File[] files = directory.listFiles();
274236
if (files.length > 0) {

0 commit comments

Comments
 (0)