Skip to content

Commit

Permalink
Permissive task capability
Browse files Browse the repository at this point in the history
Why:
We need idempotent forked tasks, meaning all tasks get
executed, but any failures are still detected upon join.
Feature request Netflix#3861

What:
Introduced the concept of Permissive tasks.
A Permissive task is similar to a Simple task. The
difference is, it permits the other tasks to continue -
in case a Permissive task failed.
Result is:
1. Forked Permissive tasks will let each other
be evaluated, until all the forked tasks had terminated.
Only then, the join task should fail. In case of Permissive
optional tasks, the join will not fail.
2. Permissive sequential tasks will let subsequent tasks
continue. While at the end, the workflow will fail in case
a permissive task had failed. The workflow would not fail
in case of Permissive optional task failure.

Testing done: PermissiveTaskMapperTest added,
TestDeciderOutcomes.testPermissive() added,
WorkflowAndTaskConfigurationSpec
"Test simple workflow which has a permissive task" and
"Test simple workflow which has a permissive optional task added"
that cover retry, ForkJoinSpec
"Test a simple workflow with fork join permissive failure flow"
added.
In addition, performed e2e tests locally running a Conductor instance.
Did build a docker image with the code changes made, started it locally,
and started a SampleWorker to poll 3 tasks in parallel.
Verified e2e scenarios of task_def_permissive, task_def_permissive_optional,
task_def_simple.json, task_def_simple_optional.json, each joining on
6 forked tasks, then running simple task 7 after join.
  • Loading branch information
ivakoleva committed Nov 27, 2023
1 parent 07fd73a commit 6883859
Show file tree
Hide file tree
Showing 14 changed files with 930 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum TaskType {
DYNAMIC,
FORK_JOIN,
FORK_JOIN_DYNAMIC,
PERMISSIVE,
DECISION,
SWITCH,
JOIN,
Expand Down Expand Up @@ -70,6 +71,7 @@ public enum TaskType {
public static final String TASK_TYPE_JSON_JQ_TRANSFORM = "JSON_JQ_TRANSFORM";
public static final String TASK_TYPE_SET_VARIABLE = "SET_VARIABLE";
public static final String TASK_TYPE_FORK = "FORK";
public static final String TASK_TYPE_PERMISSIVE = "PERMISSIVE";
public static final String TASK_TYPE_NOOP = "NOOP";

private static final Set<String> BUILT_IN_TASKS = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.USER_DEFINED;
import static com.netflix.conductor.model.TaskModel.Status.*;
Expand Down Expand Up @@ -207,7 +208,11 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
tasksToBeScheduled.put(retryTask.get().getReferenceTaskName(), retryTask.get());
executedTaskRefNames.remove(retryTask.get().getReferenceTaskName());
outcome.tasksToBeUpdated.add(pendingTask);
} else {
} else if (!(pendingTask.getWorkflowTask() != null
&& TaskType.PERMISSIVE
.name()
.equals(pendingTask.getWorkflowTask().getType())
&& !pendingTask.getWorkflowTask().isOptional())) {
pendingTask.setStatus(COMPLETED_WITH_ERRORS);
}
}
Expand Down Expand Up @@ -254,6 +259,29 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
if (hasSuccessfulTerminateTask
|| (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow))) {
LOGGER.debug("Marking workflow: {} as complete.", workflow);
List<TaskModel> permissiveTasksTerminalNonSuccessful =
workflow.getTasks().stream()
.filter(t -> t.getWorkflowTask() != null)
.filter(t -> PERMISSIVE.name().equals(t.getWorkflowTask().getType()))
.filter(t -> !t.getWorkflowTask().isOptional())
.filter(
t ->
t.getStatus().isTerminal()
&& !t.getStatus().isSuccessful())
.toList();
if (!permissiveTasksTerminalNonSuccessful.isEmpty()) {
final String errMsg =
permissiveTasksTerminalNonSuccessful.stream()
.map(
t ->
String.format(
"Task %s failed with status: %s and reason: '%s'",
t.getTaskId(),
t.getStatus(),
t.getReasonForIncompletion()))
.collect(Collectors.joining(". "));
throw new TerminateWorkflowException(errMsg);
}
outcome.isComplete = true;
}

Expand Down Expand Up @@ -437,11 +465,6 @@ public boolean checkForWorkflowCompletion(final WorkflowModel workflow)
if (status == null || !status.isTerminal()) {
return false;
}
// if we reach here, the task has been completed.
// Was the task successful in completion?
if (!status.isSuccessful()) {
return false;
}
}

boolean noPendingSchedule =
Expand Down Expand Up @@ -529,7 +552,9 @@ Optional<TaskModel> retry(
if (!task.getStatus().isRetriable()
|| TaskType.isBuiltIn(task.getTaskType())
|| expectedRetryCount <= retryCount) {
if (workflowTask != null && workflowTask.isOptional()) {
if (workflowTask != null
&& (workflowTask.isOptional()
|| TaskType.PERMISSIVE.name().equals(workflowTask.getType()))) {
return Optional.empty();
}
WorkflowModel.Status status;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

/**
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link
* TaskType#PERMISSIVE} to a {@link TaskModel} with status {@link TaskModel.Status#SCHEDULED}.
*/
@Component
public class PermissiveTaskMapper implements TaskMapper {

public static final Logger LOGGER = LoggerFactory.getLogger(PermissiveTaskMapper.class);
private final ParametersUtils parametersUtils;

public PermissiveTaskMapper(ParametersUtils parametersUtils) {
this.parametersUtils = parametersUtils;
}

@Override
public String getTaskType() {
return TaskType.PERMISSIVE.name();
}

/**
* This method maps a {@link WorkflowTask} of type {@link TaskType#PERMISSIVE} to a {@link
* TaskModel}
*
* @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link
* WorkflowDef}, {@link WorkflowModel} and a string representation of the TaskId
* @return a List with just one exclusive task
* @throws TerminateWorkflowException In case if the task definition does not exist
*/
@Override
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
throws TerminateWorkflowException {

LOGGER.debug("TaskMapperContext {} in PermissiveTaskMapper", taskMapperContext);

WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
int retryCount = taskMapperContext.getRetryCount();
String retriedTaskId = taskMapperContext.getRetryTaskId();

TaskDef taskDefinition =
Optional.ofNullable(workflowTask.getTaskDefinition())
.orElseThrow(
() -> {
String reason =
String.format(
"Invalid task. Task %s does not have a definition",
workflowTask.getName());
return new TerminateWorkflowException(reason);
});

Map<String, Object> input =
parametersUtils.getTaskInput(
workflowTask.getInputParameters(),
workflowModel,
taskDefinition,
taskMapperContext.getTaskId());
TaskModel permissiveTask = taskMapperContext.createTaskModel();
permissiveTask.setTaskType(workflowTask.getName());
permissiveTask.setStartDelayInSeconds(workflowTask.getStartDelay());
permissiveTask.setInputData(input);
permissiveTask.setStatus(TaskModel.Status.SCHEDULED);
permissiveTask.setRetryCount(retryCount);
permissiveTask.setCallbackAfterSeconds(workflowTask.getStartDelay());
permissiveTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds());
permissiveTask.setRetriedTaskId(retriedTaskId);
permissiveTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
permissiveTask.setRateLimitFrequencyInSeconds(
taskDefinition.getRateLimitFrequencyInSeconds());
return List.of(permissiveTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_EXCLUSIVE_JOIN;

@Component(TASK_TYPE_EXCLUSIVE_JOIN)
Expand Down Expand Up @@ -65,9 +66,20 @@ public boolean execute(
}
taskStatus = exclusiveTask.getStatus();
foundExlusiveJoinOnTask = taskStatus.isTerminal();
hasFailures = !taskStatus.isSuccessful();
hasFailures =
!taskStatus.isSuccessful()
&& (!PERMISSIVE.name().equals(exclusiveTask.getWorkflowTask().getType())
|| joinOn.stream()
.map(workflow::getTaskByRefName)
.allMatch(t -> t.getStatus().isTerminal()));
if (hasFailures) {
failureReason.append(exclusiveTask.getReasonForIncompletion()).append(" ");
final String failureReasons =
joinOn.stream()
.map(workflow::getTaskByRefName)
.filter(t -> !t.getStatus().isSuccessful())
.map(TaskModel::getReasonForIncompletion)
.collect(Collectors.joining(" "));
failureReason.append(failureReasons);
}

break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN;

@Component(TASK_TYPE_JOIN)
Expand Down Expand Up @@ -57,9 +58,21 @@ public boolean execute(
break;
}
TaskModel.Status taskStatus = forkedTask.getStatus();
hasFailures = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional();
hasFailures =
!taskStatus.isSuccessful()
&& !forkedTask.getWorkflowTask().isOptional()
&& (!PERMISSIVE.name().equals(forkedTask.getWorkflowTask().getType())
|| joinOn.stream()
.map(workflow::getTaskByRefName)
.allMatch(t -> t.getStatus().isTerminal()));
if (hasFailures) {
failureReason.append(forkedTask.getReasonForIncompletion()).append(" ");
final String failureReasons =
joinOn.stream()
.map(workflow::getTaskByRefName)
.filter(t -> !t.getStatus().isSuccessful())
.map(TaskModel::getReasonForIncompletion)
.collect(Collectors.joining(" "));
failureReason.append(failureReasons);
}
// Only add to task output if it's not empty
if (!forkedTask.getOutputData().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,80 @@ public void testOptional() {
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
}

/** Similar to {@link #testOptional} */
@Test
public void testPermissive() {
WorkflowDef def = new WorkflowDef();
def.setName("test-permissive");

WorkflowTask task1 = new WorkflowTask();
task1.setName("task0");
task1.setType("PERMISSIVE");
task1.setTaskReferenceName("t0");
task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}");
task1.setTaskDefinition(new TaskDef("task0"));

WorkflowTask task2 = new WorkflowTask();
task2.setName("task1");
task2.setType("PERMISSIVE");
task2.setTaskReferenceName("t1");
task2.setTaskDefinition(new TaskDef("task1"));

def.getTasks().add(task1);
def.getTasks().add(task2);
def.setSchemaVersion(2);

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowDefinition(def);
workflow.setCreateTime(System.currentTimeMillis());
DeciderOutcome outcome = deciderService.decide(workflow);
assertNotNull(outcome);
assertEquals(1, outcome.tasksToBeScheduled.size());
assertEquals(
task1.getTaskReferenceName(),
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());

for (int i = 0; i < 3; i++) {
String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();
assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId"));

workflow.getTasks().clear();
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED);

outcome = deciderService.decide(workflow);

assertNotNull(outcome);
assertEquals(1, outcome.tasksToBeUpdated.size());
assertEquals(1, outcome.tasksToBeScheduled.size());

assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus());
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
assertEquals(
task1.getTaskReferenceName(),
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
assertEquals(i + 1, outcome.tasksToBeScheduled.get(0).getRetryCount());
}

String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();

workflow.getTasks().clear();
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED);

outcome = deciderService.decide(workflow);

assertNotNull(outcome);
assertEquals(1, outcome.tasksToBeUpdated.size());
assertEquals(1, outcome.tasksToBeScheduled.size());

assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus());
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
assertEquals(
task2.getTaskReferenceName(),
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
}

@Test
public void testOptionalWithDynamicFork() {
WorkflowDef def = new WorkflowDef();
Expand Down
Loading

0 comments on commit 6883859

Please sign in to comment.