Skip to content

Commit

Permalink
Fix WorkflowRepairService to evaluate the possible task states for Si…
Browse files Browse the repository at this point in the history
…mple and system tasks. (Netflix#1941)
  • Loading branch information
kishorebanala authored and TwoUnderscorez committed Jul 23, 2021
1 parent d612e92 commit 4aa8408
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.core.execution;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.Configuration;
Expand All @@ -35,6 +36,7 @@
* This service expects that the underlying Queueing layer implements QueueDAO.containsMessage method. This can be controlled
* with {@link com.netflix.conductor.core.config.Configuration#isWorkflowRepairServiceEnabled()} property.
*/
@Singleton
public class WorkflowRepairService {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowRepairService.class);
Expand All @@ -43,7 +45,17 @@ public class WorkflowRepairService {
private final QueueDAO queueDAO;
private final Configuration configuration;

private final Predicate<Task> isSystemTask = task -> WorkflowSystemTask.is(task.getTaskType());
// For system task -> Verify the task isAsync(), not isAsyncComplete() and in SCHEDULED or IN_PROGRESS state
// For simple task -> Verify the task is in SCHEDULED state
private final Predicate<Task> isTaskRepairable = task -> {
if (WorkflowSystemTask.is(task.getTaskType())) { // If system task
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
return workflowSystemTask.isAsync() && !workflowSystemTask.isAsyncComplete(task) &&
(task.getStatus() == Task.Status.IN_PROGRESS || task.getStatus() == Task.Status.SCHEDULED);
} else { // Else if simple task
return task.getStatus() == Task.Status.SCHEDULED;
}
};

@Inject
public WorkflowRepairService(
Expand Down Expand Up @@ -108,11 +120,7 @@ private boolean verifyAndRepairDeciderQueue(Workflow workflow) {
*/
@VisibleForTesting
protected boolean verifyAndRepairTask(Task task) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
if (task.getStatus() == Task.Status.SCHEDULED) {
if (isSystemTask.test(task) && !workflowSystemTask.isAsync()) {
return false;
}
if (isTaskRepairable.test(task)) {
// Ensure QueueDAO contains this taskId
String taskQueueName = QueueUtils.getQueueName(task);
if (!queueDAO.containsMessage(taskQueueName, task.getTaskId())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.netflix.conductor.core.execution;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.tasks.Decision;
import com.netflix.conductor.core.execution.tasks.SubWorkflow;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.QueueDAO;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -34,7 +38,7 @@ public void setUp() {
}

@Test
public void verifyAndRepairTask() {
public void verifyAndRepairSimpleTaskInScheduledState() {
Task task = new Task();
task.setTaskType("SIMPLE");
task.setStatus(Task.Status.SCHEDULED);
Expand All @@ -48,6 +52,62 @@ public void verifyAndRepairTask() {
verify(queueDAO, times(1)).push(anyString(), anyString(), anyLong());
}

@Test
public void verifySimpleTaskInProgressState() {
Task task = new Task();
task.setTaskType("SIMPLE");
task.setStatus(Task.Status.IN_PROGRESS);
task.setTaskId("abcd");
task.setCallbackAfterSeconds(60);

when(queueDAO.containsMessage(anyString(), anyString())).thenReturn(false);

assertFalse(workflowRepairService.verifyAndRepairTask(task));
// Verify that queue message is never pushed for simple task in IN_PROGRESS state
verify(queueDAO, never()).containsMessage(anyString(), anyString());
verify(queueDAO, never()).push(anyString(), anyString(), anyLong());
}

@Test
public void verifyAndRepairSystemTask() {
Task task = new Task();
task.setTaskType("TEST_SYS_TASK");
task.setStatus(Task.Status.SCHEDULED);
task.setTaskId("abcd");
task.setCallbackAfterSeconds(60);

// Create a Custom system task to init WorkflowSystemTask registry.
WorkflowSystemTask workflowSystemTask = new WorkflowSystemTask("TEST_SYS_TASK") {
@Override
public boolean isAsync() {
return true;
}

@Override
public boolean isAsyncComplete(Task task) {
return false;
}

@Override
public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
super.start(workflow, task, executor);
}
};

when(queueDAO.containsMessage(anyString(), anyString())).thenReturn(false);

assertTrue(workflowRepairService.verifyAndRepairTask(task));
// Verify that a new queue message is pushed for tasks that fails queue contains check.
verify(queueDAO, times(1)).push(anyString(), anyString(), anyLong());

// Verify a system task in IN_PROGRESS state can be recovered.
Mockito.reset(queueDAO);
task.setStatus(Task.Status.IN_PROGRESS);
assertTrue(workflowRepairService.verifyAndRepairTask(task));
// Verify that a new queue message is pushed for async System task in IN_PROGRESS state that fails queue contains check.
verify(queueDAO, times(1)).push(anyString(), anyString(), anyLong());
}

@Test
public void assertSyncSystemTasksAreNotCheckedAgainstQueue() {
// Create a Decision object to init WorkflowSystemTask registry.
Expand All @@ -63,4 +123,22 @@ public void assertSyncSystemTasksAreNotCheckedAgainstQueue() {
// Verify that queue message is never pushed for sync system tasks
verify(queueDAO, never()).push(anyString(), anyString(), anyLong());
}

@Test
public void assertAsyncCompleteSystemTasksAreNotCheckedAgainstQueue() {
Task task = new Task();
task.setTaskType("SUB_WORKFLOW");
task.setStatus(Task.Status.IN_PROGRESS);
task.setTaskId("abcd");
task.setCallbackAfterSeconds(60);

WorkflowSystemTask workflowSystemTask = new SubWorkflow();

assertTrue(workflowSystemTask.isAsyncComplete(task));

assertFalse(workflowRepairService.verifyAndRepairTask(task));
// Verify that queue message is never pushed for async complete system tasks
verify(queueDAO, never()).containsMessage(anyString(), anyString());
verify(queueDAO, never()).push(anyString(), anyString(), anyLong());
}
}

0 comments on commit 4aa8408

Please sign in to comment.