Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Fix WorkflowRepairService to evaluate the possible task states for Si… #1941

Merged
merged 1 commit into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.core.execution;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.Configuration;
Expand All @@ -35,6 +36,7 @@
* This service expects that the underlying Queueing layer implements QueueDAO.containsMessage method. This can be controlled
* with {@link com.netflix.conductor.core.config.Configuration#isWorkflowRepairServiceEnabled()} property.
*/
@Singleton
public class WorkflowRepairService {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowRepairService.class);
Expand All @@ -43,7 +45,17 @@ public class WorkflowRepairService {
private final QueueDAO queueDAO;
private final Configuration configuration;

private final Predicate<Task> isSystemTask = task -> WorkflowSystemTask.is(task.getTaskType());
// For system task -> Verify the task isAsync(), not isAsyncComplete() and in SCHEDULED or IN_PROGRESS state
// For simple task -> Verify the task is in SCHEDULED state
private final Predicate<Task> isTaskRepairable = task -> {
if (WorkflowSystemTask.is(task.getTaskType())) { // If system task
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
return workflowSystemTask.isAsync() && !workflowSystemTask.isAsyncComplete(task) &&
(task.getStatus() == Task.Status.IN_PROGRESS || task.getStatus() == Task.Status.SCHEDULED);
} else { // Else if simple task
return task.getStatus() == Task.Status.SCHEDULED;
}
};

@Inject
public WorkflowRepairService(
Expand Down Expand Up @@ -108,11 +120,7 @@ private boolean verifyAndRepairDeciderQueue(Workflow workflow) {
*/
@VisibleForTesting
protected boolean verifyAndRepairTask(Task task) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
if (task.getStatus() == Task.Status.SCHEDULED) {
if (isSystemTask.test(task) && !workflowSystemTask.isAsync()) {
return false;
}
if (isTaskRepairable.test(task)) {
// Ensure QueueDAO contains this taskId
String taskQueueName = QueueUtils.getQueueName(task);
if (!queueDAO.containsMessage(taskQueueName, task.getTaskId())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.netflix.conductor.core.execution;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.tasks.Decision;
import com.netflix.conductor.core.execution.tasks.SubWorkflow;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.QueueDAO;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -34,7 +38,7 @@ public void setUp() {
}

@Test
public void verifyAndRepairTask() {
public void verifyAndRepairSimpleTaskInScheduledState() {
Task task = new Task();
task.setTaskType("SIMPLE");
task.setStatus(Task.Status.SCHEDULED);
Expand All @@ -48,6 +52,62 @@ public void verifyAndRepairTask() {
verify(queueDAO, times(1)).push(anyString(), anyString(), anyLong());
}

@Test
public void verifySimpleTaskInProgressState() {
Task task = new Task();
task.setTaskType("SIMPLE");
task.setStatus(Task.Status.IN_PROGRESS);
task.setTaskId("abcd");
task.setCallbackAfterSeconds(60);

when(queueDAO.containsMessage(anyString(), anyString())).thenReturn(false);

assertFalse(workflowRepairService.verifyAndRepairTask(task));
// Verify that queue message is never pushed for simple task in IN_PROGRESS state
verify(queueDAO, never()).containsMessage(anyString(), anyString());
verify(queueDAO, never()).push(anyString(), anyString(), anyLong());
}

@Test
public void verifyAndRepairSystemTask() {
Task task = new Task();
task.setTaskType("TEST_SYS_TASK");
task.setStatus(Task.Status.SCHEDULED);
task.setTaskId("abcd");
task.setCallbackAfterSeconds(60);

// Create a Custom system task to init WorkflowSystemTask registry.
WorkflowSystemTask workflowSystemTask = new WorkflowSystemTask("TEST_SYS_TASK") {
@Override
public boolean isAsync() {
return true;
}

@Override
public boolean isAsyncComplete(Task task) {
return false;
}

@Override
public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
super.start(workflow, task, executor);
}
};

when(queueDAO.containsMessage(anyString(), anyString())).thenReturn(false);

assertTrue(workflowRepairService.verifyAndRepairTask(task));
// Verify that a new queue message is pushed for tasks that fails queue contains check.
verify(queueDAO, times(1)).push(anyString(), anyString(), anyLong());

// Verify a system task in IN_PROGRESS state can be recovered.
Mockito.reset(queueDAO);
task.setStatus(Task.Status.IN_PROGRESS);
assertTrue(workflowRepairService.verifyAndRepairTask(task));
// Verify that a new queue message is pushed for async System task in IN_PROGRESS state that fails queue contains check.
verify(queueDAO, times(1)).push(anyString(), anyString(), anyLong());
}

@Test
public void assertSyncSystemTasksAreNotCheckedAgainstQueue() {
// Create a Decision object to init WorkflowSystemTask registry.
Expand All @@ -63,4 +123,22 @@ public void assertSyncSystemTasksAreNotCheckedAgainstQueue() {
// Verify that queue message is never pushed for sync system tasks
verify(queueDAO, never()).push(anyString(), anyString(), anyLong());
}

@Test
public void assertAsyncCompleteSystemTasksAreNotCheckedAgainstQueue() {
Task task = new Task();
task.setTaskType("SUB_WORKFLOW");
task.setStatus(Task.Status.IN_PROGRESS);
task.setTaskId("abcd");
task.setCallbackAfterSeconds(60);

WorkflowSystemTask workflowSystemTask = new SubWorkflow();

assertTrue(workflowSystemTask.isAsyncComplete(task));

assertFalse(workflowRepairService.verifyAndRepairTask(task));
// Verify that queue message is never pushed for async complete system tasks
verify(queueDAO, never()).containsMessage(anyString(), anyString());
verify(queueDAO, never()).push(anyString(), anyString(), anyLong());
}
}