Skip to content

Commit 67bfc90

Browse files
authored
Merge pull request GoogleCloudPlatform#563 from dhalperi/cancel-double
DataflowPipelineJob: gracefully handle cancellation concurrent with termination
2 parents 15cb364 + 1cb04a6 commit 67bfc90

File tree

2 files changed

+62
-3
lines changed

2 files changed

+62
-3
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,9 +292,35 @@ public void cancel() throws IOException {
292292
content.setProjectId(projectId);
293293
content.setId(jobId);
294294
content.setRequestedState("JOB_STATE_CANCELLED");
295-
dataflowClient.projects().jobs()
296-
.update(projectId, jobId, content)
297-
.execute();
295+
try {
296+
dataflowClient.projects().jobs()
297+
.update(projectId, jobId, content)
298+
.execute();
299+
} catch (IOException e) {
300+
State state = getState();
301+
if (state.isTerminal()) {
302+
LOG.warn("Cancel failed because job {} is already terminated in state {}.", jobId, state);
303+
} else if (e.getMessage().contains("has terminated")) {
304+
// This handles the case where the getState() call above returns RUNNING but the cancel
305+
// was rejected because the job is in fact done. Hopefully, someday we can delete this
306+
// code if there is better consistency between the State and whether Cancel succeeds.
307+
//
308+
// Example message:
309+
// Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform
310+
// operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has
311+
// terminated in state SUCCESS: Workflow job: 2017-04-01_22_50_59-9269855660514862348
312+
// succeeded.
313+
LOG.warn("Cancel failed because job {} is already terminated.", jobId, e);
314+
} else {
315+
String errorMsg = String.format(
316+
"Failed to cancel job in state %s, "
317+
+ "please go to the Developers Console to cancel it manually: %s",
318+
state,
319+
MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
320+
LOG.warn(errorMsg);
321+
throw new IOException(errorMsg, e);
322+
}
323+
}
298324
}
299325

300326
@Override

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJobTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.hamcrest.Matchers.is;
2727
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2828
import static org.junit.Assert.assertEquals;
29+
import static org.mockito.Matchers.any;
2930
import static org.mockito.Matchers.eq;
3031
import static org.mockito.Mockito.mock;
3132
import static org.mockito.Mockito.when;
@@ -42,6 +43,7 @@
4243
import com.google.api.services.dataflow.model.MetricUpdate;
4344
import com.google.cloud.dataflow.sdk.PipelineResult.State;
4445
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
46+
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
4547
import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper;
4648
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
4749
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
@@ -91,6 +93,9 @@ public class DataflowPipelineJobTest {
9193
@Rule
9294
public ExpectedException thrown = ExpectedException.none();
9395

96+
@Rule
97+
public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowPipelineJob.class);
98+
9499
@Before
95100
public void setup() {
96101
MockitoAnnotations.initMocks(this);
@@ -193,6 +198,34 @@ public void testWaitToFinishCancelled() throws Exception {
193198
assertEquals(State.CANCELLED, mockWaitToFinishInState(State.CANCELLED));
194199
}
195200

201+
/**
202+
* Test that {@link DataflowPipelineJob#cancel} doesn't throw if the Dataflow service returns
203+
* non-terminal state even though the cancel API call failed, which can happen in practice.
204+
*
205+
* <p>TODO: delete this code if the API calls become consistent.
206+
*/
207+
@Test
208+
public void testCancelTerminatedJobWithStaleState() throws IOException {
209+
Dataflow.Projects.Jobs.Get statusRequest =
210+
mock(Dataflow.Projects.Jobs.Get.class);
211+
212+
Job statusResponse = new Job();
213+
statusResponse.setCurrentState("JOB_STATE_RUNNING");
214+
when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(statusRequest);
215+
when(statusRequest.execute()).thenReturn(statusResponse);
216+
217+
Dataflow.Projects.Jobs.Update update = mock(
218+
Dataflow.Projects.Jobs.Update.class);
219+
when(mockJobs.update(eq(PROJECT_ID), eq(JOB_ID), any(Job.class)))
220+
.thenReturn(update);
221+
when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS"));
222+
223+
DataflowPipelineJob job = new DataflowPipelineJob(
224+
PROJECT_ID, JOB_ID, mockWorkflowClient, null);
225+
job.cancel();
226+
expectedLogs.verifyWarn("Cancel failed because job " + JOB_ID + " is already terminated.");
227+
}
228+
196229
/**
197230
* Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED}
198231
* state is terminal.

0 commit comments

Comments
 (0)