Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
repair inconsistent subworkflow task (#3183)
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx authored Aug 16, 2022
1 parent 478ea92 commit 9cb3bbc
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.stereotype.Service;

import com.netflix.conductor.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
Expand Down Expand Up @@ -138,6 +139,13 @@ boolean verifyAndRepairTask(TaskModel task) {
Monitors.recordQueueMessageRepushFromRepairService(task.getTaskDefName());
return true;
}
} else if (task.getTaskType().equals(TaskType.TASK_TYPE_SUB_WORKFLOW)
&& task.getStatus() == TaskModel.Status.IN_PROGRESS) {
WorkflowModel subWorkflow = executionDAO.getWorkflow(task.getSubWorkflowId(), false);
if (subWorkflow.getStatus().isTerminal()) {
repairSubWorkflowTask(task, subWorkflow.getStatus());
return true;
}
}
return false;
}
Expand All @@ -156,4 +164,22 @@ private boolean verifyAndRepairWorkflow(String workflowId) {
}
return false;
}

private void repairSubWorkflowTask(TaskModel task, WorkflowModel.Status subWorkflowStatus) {
switch (subWorkflowStatus) {
case COMPLETED:
task.setStatus(TaskModel.Status.COMPLETED);
break;
case FAILED:
task.setStatus(TaskModel.Status.FAILED);
break;
case TERMINATED:
task.setStatus(TaskModel.Status.CANCELED);
break;
case TIMED_OUT:
task.setStatus(TaskModel.Status.TIMED_OUT);
break;
}
executionDAO.updateTask(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
package com.netflix.conductor.core.reconciliation;

import java.time.Duration;
import java.util.Map;

import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueues;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.*;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.model.TaskModel;
Expand All @@ -29,8 +33,7 @@

import static com.netflix.conductor.common.metadata.tasks.TaskType.*;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -169,13 +172,18 @@ public void assertSyncSystemTasksAreNotCheckedAgainstQueue() {
@Test
public void assertAsyncCompleteInProgressSystemTasksAreNotCheckedAgainstQueue() {
TaskModel task = new TaskModel();
task.setTaskType(TASK_TYPE_SUB_WORKFLOW);
task.setTaskType(TASK_TYPE_EVENT);
task.setStatus(TaskModel.Status.IN_PROGRESS);
task.setTaskId("abcd");
task.setCallbackAfterSeconds(60);
task.setInputData(Map.of("asyncComplete", true));

WorkflowSystemTask workflowSystemTask = new SubWorkflow(new ObjectMapper());
when(systemTaskRegistry.get(TASK_TYPE_SUB_WORKFLOW)).thenReturn(workflowSystemTask);
WorkflowSystemTask workflowSystemTask =
new Event(
mock(EventQueues.class),
mock(ParametersUtils.class),
mock(ObjectMapper.class));
when(systemTaskRegistry.get(TASK_TYPE_EVENT)).thenReturn(workflowSystemTask);

assertTrue(workflowSystemTask.isAsyncComplete(task));

Expand Down Expand Up @@ -219,4 +227,34 @@ public void verifyAndRepairParentWorkflow() {
verify(queueDAO, times(1)).containsMessage(anyString(), anyString());
verify(queueDAO, times(1)).push(anyString(), anyString(), anyLong());
}

@Test
public void assertInProgressSubWorkflowSystemTasksAreCheckedAndRepaired() {
String subWorkflowId = "subWorkflowId";
String taskId = "taskId";

TaskModel task = new TaskModel();
task.setTaskType(TASK_TYPE_SUB_WORKFLOW);
task.setStatus(TaskModel.Status.IN_PROGRESS);
task.setTaskId(taskId);
task.setCallbackAfterSeconds(60);
task.setSubWorkflowId(subWorkflowId);

WorkflowModel subWorkflow = new WorkflowModel();
subWorkflow.setWorkflowId(subWorkflowId);
subWorkflow.setStatus(WorkflowModel.Status.TERMINATED);

when(executionDAO.getWorkflow(subWorkflowId, false)).thenReturn(subWorkflow);

assertTrue(workflowRepairService.verifyAndRepairTask(task));
// Verify that queue message is never pushed for async complete system tasks
verify(queueDAO, never()).containsMessage(anyString(), anyString());
verify(queueDAO, never()).push(anyString(), anyString(), anyLong());
// Verify
ArgumentCaptor<TaskModel> argumentCaptor = ArgumentCaptor.forClass(TaskModel.class);
verify(executionDAO, times(1)).updateTask(argumentCaptor.capture());
assertEquals(taskId, argumentCaptor.getValue().getTaskId());
assertEquals(subWorkflowId, argumentCaptor.getValue().getSubWorkflowId());
assertEquals(TaskModel.Status.CANCELED, argumentCaptor.getValue().getStatus());
}
}

0 comments on commit 9cb3bbc

Please sign in to comment.