Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Fix WorkflowRepairService to evaluate the possible task states for Si…
Browse files Browse the repository at this point in the history
…mple and system tasks. (#1941)
  • Loading branch information
kishorebanala authored Oct 29, 2020
1 parent b51ce0a commit 6bce391
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.core.execution;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.Configuration;
Expand All @@ -35,6 +36,7 @@
* This service expects that the underlying Queueing layer implements QueueDAO.containsMessage method. This can be controlled
* with {@link com.netflix.conductor.core.config.Configuration#isWorkflowRepairServiceEnabled()} property.
*/
@Singleton
public class WorkflowRepairService {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowRepairService.class);
Expand All @@ -43,7 +45,17 @@ public class WorkflowRepairService {
private final QueueDAO queueDAO;
private final Configuration configuration;

private final Predicate<Task> isSystemTask = task -> WorkflowSystemTask.is(task.getTaskType());
// For system task -> Verify the task isAsync(), not isAsyncComplete() and in SCHEDULED or IN_PROGRESS state
// For simple task -> Verify the task is in SCHEDULED state
private final Predicate<Task> isTaskRepairable = task -> {
if (WorkflowSystemTask.is(task.getTaskType())) { // If system task
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
return workflowSystemTask.isAsync() && !workflowSystemTask.isAsyncComplete(task) &&
(task.getStatus() == Task.Status.IN_PROGRESS || task.getStatus() == Task.Status.SCHEDULED);
} else { // Else if simple task
return task.getStatus() == Task.Status.SCHEDULED;
}
};

@Inject
public WorkflowRepairService(
Expand Down Expand Up @@ -108,11 +120,7 @@ private boolean verifyAndRepairDeciderQueue(Workflow workflow) {
*/
@VisibleForTesting
protected boolean verifyAndRepairTask(Task task) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
if (task.getStatus() == Task.Status.SCHEDULED) {
if (isSystemTask.test(task) && !workflowSystemTask.isAsync()) {
return false;
}
if (isTaskRepairable.test(task)) {
// Ensure QueueDAO contains this taskId
String taskQueueName = QueueUtils.getQueueName(task);
if (!queueDAO.containsMessage(taskQueueName, task.getTaskId())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.netflix.conductor.core.execution;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.tasks.Decision;
import com.netflix.conductor.core.execution.tasks.SubWorkflow;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.QueueDAO;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -34,7 +38,7 @@ public void setUp() {
}

@Test
public void verifyAndRepairTask() {
public void verifyAndRepairSimpleTaskInScheduledState() {
Task task = new Task();
task.setTaskType("SIMPLE");
task.setStatus(Task.Status.SCHEDULED);
Expand All @@ -48,6 +52,62 @@ public void verifyAndRepairTask() {
verify(queueDAO, times(1)).push(anyString(), anyString(), anyLong());
}

@Test
public void verifySimpleTaskInProgressState() {
Task task = new Task();
task.setTaskType("SIMPLE");
task.setStatus(Task.Status.IN_PROGRESS);
task.setTaskId("abcd");
task.setCallbackAfterSeconds(60);

when(queueDAO.containsMessage(anyString(), anyString())).thenReturn(false);

assertFalse(workflowRepairService.verifyAndRepairTask(task));
// Verify that queue message is never pushed for simple task in IN_PROGRESS state
verify(queueDAO, never()).containsMessage(anyString(), anyString());
verify(queueDAO, never()).push(anyString(), anyString(), anyLong());
}

@Test
public void verifyAndRepairSystemTask() {
Task task = new Task();
task.setTaskType("TEST_SYS_TASK");
task.setStatus(Task.Status.SCHEDULED);
task.setTaskId("abcd");
task.setCallbackAfterSeconds(60);

// Create a Custom system task to init WorkflowSystemTask registry.
WorkflowSystemTask workflowSystemTask = new WorkflowSystemTask("TEST_SYS_TASK") {
@Override
public boolean isAsync() {
return true;
}

@Override
public boolean isAsyncComplete(Task task) {
return false;
}

@Override
public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
super.start(workflow, task, executor);
}
};

when(queueDAO.containsMessage(anyString(), anyString())).thenReturn(false);

assertTrue(workflowRepairService.verifyAndRepairTask(task));
// Verify that a new queue message is pushed for tasks that fails queue contains check.
verify(queueDAO, times(1)).push(anyString(), anyString(), anyLong());

// Verify a system task in IN_PROGRESS state can be recovered.
Mockito.reset(queueDAO);
task.setStatus(Task.Status.IN_PROGRESS);
assertTrue(workflowRepairService.verifyAndRepairTask(task));
// Verify that a new queue message is pushed for async System task in IN_PROGRESS state that fails queue contains check.
verify(queueDAO, times(1)).push(anyString(), anyString(), anyLong());
}

@Test
public void assertSyncSystemTasksAreNotCheckedAgainstQueue() {
// Create a Decision object to init WorkflowSystemTask registry.
Expand All @@ -63,4 +123,22 @@ public void assertSyncSystemTasksAreNotCheckedAgainstQueue() {
// Verify that queue message is never pushed for sync system tasks
verify(queueDAO, never()).push(anyString(), anyString(), anyLong());
}

@Test
public void assertAsyncCompleteSystemTasksAreNotCheckedAgainstQueue() {
Task task = new Task();
task.setTaskType("SUB_WORKFLOW");
task.setStatus(Task.Status.IN_PROGRESS);
task.setTaskId("abcd");
task.setCallbackAfterSeconds(60);

WorkflowSystemTask workflowSystemTask = new SubWorkflow();

assertTrue(workflowSystemTask.isAsyncComplete(task));

assertFalse(workflowRepairService.verifyAndRepairTask(task));
// Verify that queue message is never pushed for async complete system tasks
verify(queueDAO, never()).containsMessage(anyString(), anyString());
verify(queueDAO, never()).push(anyString(), anyString(), anyLong());
}
}

0 comments on commit 6bce391

Please sign in to comment.