From 6883859153f0f79b0be3c4575584bef985b41b38 Mon Sep 17 00:00:00 2001 From: Iva Koleva Date: Tue, 21 Nov 2023 15:29:32 +0200 Subject: [PATCH] Permissive task capability Why: We need idempotent forked tasks, meaning all tasks get executed, but any failures are still detected upon join. Feature request https://github.com/Netflix/conductor/issues/3861 What: Introduced the concept of Permissive tasks. A Permissive task is similar to a Simple task. The difference is, it permits the other tasks to continue - in case a Permissive task failed. Result is: 1. Forked Permissive tasks will let each other be evaluated, until all the forked tasks had terminated. Only then, the join task should fail. In case of Permissive optional tasks, the join will not fail. 2. Permissive sequential tasks will let subsequent tasks continue. While at the end, the workflow will fail in case a permissive task had failed. The workflow would not fail in case of Permissive optional task failure. Testing done: PermissiveTaskMapperTest added, TestDeciderOutcomes.testPermissive() added, WorkflowAndTaskConfigurationSpec "Test simple workflow which has a permissive task" and "Test simple workflow which has a permissive optional task added" that cover retry, ForkJoinSpec "Test a simple workflow with fork join permissive failure flow" added. In addition, performed e2e tests locally running a Conductor instance. Did build a docker image with the code changes made, started it locally, and started a SampleWorker to poll 3 tasks in parallel. Verified e2e scenarios of task_def_permissive, task_def_permissive_optional, task_def_simple.json, task_def_simple_optional.json, each joining on 6 forked tasks, then running simple task 7 after join. --- .../common/metadata/tasks/TaskType.java | 2 + .../core/execution/DeciderService.java | 39 ++++- .../mapper/PermissiveTaskMapper.java | 102 +++++++++++ .../core/execution/tasks/ExclusiveJoin.java | 16 +- .../conductor/core/execution/tasks/Join.java | 17 +- .../core/execution/TestDeciderOutcomes.java | 74 ++++++++ .../mapper/PermissiveTaskMapperTest.java | 114 +++++++++++++ .../workflow/def/tasks/PermissiveTask.java | 47 +++++ .../workflow/executor/WorkflowExecutor.java | 3 +- .../test/integration/ForkJoinSpec.groovy | 112 +++++++++++- .../WorkflowAndTaskConfigurationSpec.groovy | 160 +++++++++++++++++- ...fork_join_permissive_integration_test.json | 109 ++++++++++++ ...issive_optional_task_integration_test.json | 58 +++++++ ...with_permissive_task_integration_test.json | 92 ++++++++++ 14 files changed, 930 insertions(+), 15 deletions(-) create mode 100644 core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java create mode 100644 core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java create mode 100644 java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java create mode 100644 test-harness/src/test/resources/fork_join_permissive_integration_test.json create mode 100644 test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json create mode 100644 test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java index 235a0ac91b..efc985acd5 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java @@ -23,6 +23,7 @@ public enum TaskType { DYNAMIC, FORK_JOIN, FORK_JOIN_DYNAMIC, + PERMISSIVE, DECISION, SWITCH, JOIN, @@ -70,6 +71,7 @@ public enum TaskType { public static final String TASK_TYPE_JSON_JQ_TRANSFORM = "JSON_JQ_TRANSFORM"; public static final String TASK_TYPE_SET_VARIABLE = "SET_VARIABLE"; public static final String TASK_TYPE_FORK = "FORK"; + public static final String TASK_TYPE_PERMISSIVE = "PERMISSIVE"; public static final String TASK_TYPE_NOOP = "NOOP"; private static final Set BUILT_IN_TASKS = new HashSet<>(); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index c6bf486baa..376896122b 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -44,6 +44,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE; import static com.netflix.conductor.common.metadata.tasks.TaskType.USER_DEFINED; import static com.netflix.conductor.model.TaskModel.Status.*; @@ -207,7 +208,11 @@ private DeciderOutcome decide(final WorkflowModel workflow, List preS tasksToBeScheduled.put(retryTask.get().getReferenceTaskName(), retryTask.get()); executedTaskRefNames.remove(retryTask.get().getReferenceTaskName()); outcome.tasksToBeUpdated.add(pendingTask); - } else { + } else if (!(pendingTask.getWorkflowTask() != null + && TaskType.PERMISSIVE + .name() + .equals(pendingTask.getWorkflowTask().getType()) + && !pendingTask.getWorkflowTask().isOptional())) { pendingTask.setStatus(COMPLETED_WITH_ERRORS); } } @@ -254,6 +259,29 @@ private DeciderOutcome decide(final WorkflowModel workflow, List preS if (hasSuccessfulTerminateTask || (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow))) { LOGGER.debug("Marking workflow: {} as complete.", workflow); + List permissiveTasksTerminalNonSuccessful = + workflow.getTasks().stream() + .filter(t -> t.getWorkflowTask() != null) + .filter(t -> PERMISSIVE.name().equals(t.getWorkflowTask().getType())) + .filter(t -> !t.getWorkflowTask().isOptional()) + .filter( + t -> + t.getStatus().isTerminal() + && !t.getStatus().isSuccessful()) + .toList(); + if (!permissiveTasksTerminalNonSuccessful.isEmpty()) { + final String errMsg = + permissiveTasksTerminalNonSuccessful.stream() + .map( + t -> + String.format( + "Task %s failed with status: %s and reason: '%s'", + t.getTaskId(), + t.getStatus(), + t.getReasonForIncompletion())) + .collect(Collectors.joining(". ")); + throw new TerminateWorkflowException(errMsg); + } outcome.isComplete = true; } @@ -437,11 +465,6 @@ public boolean checkForWorkflowCompletion(final WorkflowModel workflow) if (status == null || !status.isTerminal()) { return false; } - // if we reach here, the task has been completed. - // Was the task successful in completion? - if (!status.isSuccessful()) { - return false; - } } boolean noPendingSchedule = @@ -529,7 +552,9 @@ Optional retry( if (!task.getStatus().isRetriable() || TaskType.isBuiltIn(task.getTaskType()) || expectedRetryCount <= retryCount) { - if (workflowTask != null && workflowTask.isOptional()) { + if (workflowTask != null + && (workflowTask.isOptional() + || TaskType.PERMISSIVE.name().equals(workflowTask.getType()))) { return Optional.empty(); } WorkflowModel.Status status; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java new file mode 100644 index 0000000000..124a9b2e03 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java @@ -0,0 +1,102 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.mapper; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.tasks.TaskType; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.core.exception.TerminateWorkflowException; +import com.netflix.conductor.core.utils.ParametersUtils; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.WorkflowModel; + +/** + * An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link + * TaskType#PERMISSIVE} to a {@link TaskModel} with status {@link TaskModel.Status#SCHEDULED}. + */ +@Component +public class PermissiveTaskMapper implements TaskMapper { + + public static final Logger LOGGER = LoggerFactory.getLogger(PermissiveTaskMapper.class); + private final ParametersUtils parametersUtils; + + public PermissiveTaskMapper(ParametersUtils parametersUtils) { + this.parametersUtils = parametersUtils; + } + + @Override + public String getTaskType() { + return TaskType.PERMISSIVE.name(); + } + + /** + * This method maps a {@link WorkflowTask} of type {@link TaskType#PERMISSIVE} to a {@link + * TaskModel} + * + * @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link + * WorkflowDef}, {@link WorkflowModel} and a string representation of the TaskId + * @return a List with just one exclusive task + * @throws TerminateWorkflowException In case if the task definition does not exist + */ + @Override + public List getMappedTasks(TaskMapperContext taskMapperContext) + throws TerminateWorkflowException { + + LOGGER.debug("TaskMapperContext {} in PermissiveTaskMapper", taskMapperContext); + + WorkflowTask workflowTask = taskMapperContext.getWorkflowTask(); + WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); + int retryCount = taskMapperContext.getRetryCount(); + String retriedTaskId = taskMapperContext.getRetryTaskId(); + + TaskDef taskDefinition = + Optional.ofNullable(workflowTask.getTaskDefinition()) + .orElseThrow( + () -> { + String reason = + String.format( + "Invalid task. Task %s does not have a definition", + workflowTask.getName()); + return new TerminateWorkflowException(reason); + }); + + Map input = + parametersUtils.getTaskInput( + workflowTask.getInputParameters(), + workflowModel, + taskDefinition, + taskMapperContext.getTaskId()); + TaskModel permissiveTask = taskMapperContext.createTaskModel(); + permissiveTask.setTaskType(workflowTask.getName()); + permissiveTask.setStartDelayInSeconds(workflowTask.getStartDelay()); + permissiveTask.setInputData(input); + permissiveTask.setStatus(TaskModel.Status.SCHEDULED); + permissiveTask.setRetryCount(retryCount); + permissiveTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); + permissiveTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds()); + permissiveTask.setRetriedTaskId(retriedTaskId); + permissiveTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency()); + permissiveTask.setRateLimitFrequencyInSeconds( + taskDefinition.getRateLimitFrequencyInSeconds()); + return List.of(permissiveTask); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java index e2bf0ac0bf..62a02c3c95 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java @@ -24,6 +24,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_EXCLUSIVE_JOIN; @Component(TASK_TYPE_EXCLUSIVE_JOIN) @@ -65,9 +66,20 @@ public boolean execute( } taskStatus = exclusiveTask.getStatus(); foundExlusiveJoinOnTask = taskStatus.isTerminal(); - hasFailures = !taskStatus.isSuccessful(); + hasFailures = + !taskStatus.isSuccessful() + && (!PERMISSIVE.name().equals(exclusiveTask.getWorkflowTask().getType()) + || joinOn.stream() + .map(workflow::getTaskByRefName) + .allMatch(t -> t.getStatus().isTerminal())); if (hasFailures) { - failureReason.append(exclusiveTask.getReasonForIncompletion()).append(" "); + final String failureReasons = + joinOn.stream() + .map(workflow::getTaskByRefName) + .filter(t -> !t.getStatus().isSuccessful()) + .map(TaskModel::getReasonForIncompletion) + .collect(Collectors.joining(" ")); + failureReason.append(failureReasons); } break; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index e0f7be9faa..14fdba6f13 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -23,6 +23,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN; @Component(TASK_TYPE_JOIN) @@ -57,9 +58,21 @@ public boolean execute( break; } TaskModel.Status taskStatus = forkedTask.getStatus(); - hasFailures = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional(); + hasFailures = + !taskStatus.isSuccessful() + && !forkedTask.getWorkflowTask().isOptional() + && (!PERMISSIVE.name().equals(forkedTask.getWorkflowTask().getType()) + || joinOn.stream() + .map(workflow::getTaskByRefName) + .allMatch(t -> t.getStatus().isTerminal())); if (hasFailures) { - failureReason.append(forkedTask.getReasonForIncompletion()).append(" "); + final String failureReasons = + joinOn.stream() + .map(workflow::getTaskByRefName) + .filter(t -> !t.getStatus().isSuccessful()) + .map(TaskModel::getReasonForIncompletion) + .collect(Collectors.joining(" ")); + failureReason.append(failureReasons); } // Only add to task output if it's not empty if (!forkedTask.getOutputData().isEmpty()) { diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java index dbf0277f6e..ca06c8ea86 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java @@ -440,6 +440,80 @@ public void testOptional() { outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); } + /** Similar to {@link #testOptional} */ + @Test + public void testPermissive() { + WorkflowDef def = new WorkflowDef(); + def.setName("test-permissive"); + + WorkflowTask task1 = new WorkflowTask(); + task1.setName("task0"); + task1.setType("PERMISSIVE"); + task1.setTaskReferenceName("t0"); + task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}"); + task1.setTaskDefinition(new TaskDef("task0")); + + WorkflowTask task2 = new WorkflowTask(); + task2.setName("task1"); + task2.setType("PERMISSIVE"); + task2.setTaskReferenceName("t1"); + task2.setTaskDefinition(new TaskDef("task1")); + + def.getTasks().add(task1); + def.getTasks().add(task2); + def.setSchemaVersion(2); + + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowDefinition(def); + workflow.setCreateTime(System.currentTimeMillis()); + DeciderOutcome outcome = deciderService.decide(workflow); + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeScheduled.size()); + assertEquals( + task1.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + + for (int i = 0; i < 3; i++) { + String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId(); + assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId")); + + workflow.getTasks().clear(); + workflow.getTasks().addAll(outcome.tasksToBeScheduled); + workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED); + + outcome = deciderService.decide(workflow); + + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeUpdated.size()); + assertEquals(1, outcome.tasksToBeScheduled.size()); + + assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus()); + assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId()); + assertEquals( + task1.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + assertEquals(i + 1, outcome.tasksToBeScheduled.get(0).getRetryCount()); + } + + String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId(); + + workflow.getTasks().clear(); + workflow.getTasks().addAll(outcome.tasksToBeScheduled); + workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED); + + outcome = deciderService.decide(workflow); + + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeUpdated.size()); + assertEquals(1, outcome.tasksToBeScheduled.size()); + + assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus()); + assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId()); + assertEquals( + task2.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + } + @Test public void testOptionalWithDynamicFork() { WorkflowDef def = new WorkflowDef(); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java b/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java new file mode 100644 index 0000000000..97920a3f02 --- /dev/null +++ b/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.mapper; + +import java.util.HashMap; +import java.util.List; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.core.exception.TerminateWorkflowException; +import com.netflix.conductor.core.utils.IDGenerator; +import com.netflix.conductor.core.utils.ParametersUtils; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.WorkflowModel; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.*; + +public class PermissiveTaskMapperTest { + + private PermissiveTaskMapper permissiveTaskMapper; + + private IDGenerator idGenerator = new IDGenerator(); + + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() { + ParametersUtils parametersUtils = mock(ParametersUtils.class); + permissiveTaskMapper = new PermissiveTaskMapper(parametersUtils); + } + + @Test + public void getMappedTasks() { + + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("permissive_task"); + workflowTask.setTaskDefinition(new TaskDef("permissive_task")); + + String taskId = idGenerator.generate(); + String retriedTaskId = idGenerator.generate(); + + WorkflowDef workflowDef = new WorkflowDef(); + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowDefinition(workflowDef); + + TaskMapperContext taskMapperContext = + TaskMapperContext.newBuilder() + .withWorkflowModel(workflow) + .withTaskDefinition(new TaskDef()) + .withWorkflowTask(workflowTask) + .withTaskInput(new HashMap<>()) + .withRetryCount(0) + .withRetryTaskId(retriedTaskId) + .withTaskId(taskId) + .build(); + + List mappedTasks = permissiveTaskMapper.getMappedTasks(taskMapperContext); + assertNotNull(mappedTasks); + assertEquals(1, mappedTasks.size()); + } + + @Test + public void getMappedTasksException() { + + // Given + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("permissive_task"); + String taskId = idGenerator.generate(); + String retriedTaskId = idGenerator.generate(); + + WorkflowDef workflowDef = new WorkflowDef(); + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowDefinition(workflowDef); + + TaskMapperContext taskMapperContext = + TaskMapperContext.newBuilder() + .withWorkflowModel(workflow) + .withTaskDefinition(new TaskDef()) + .withWorkflowTask(workflowTask) + .withTaskInput(new HashMap<>()) + .withRetryCount(0) + .withRetryTaskId(retriedTaskId) + .withTaskId(taskId) + .build(); + + // then + expectedException.expect(TerminateWorkflowException.class); + expectedException.expectMessage( + String.format( + "Invalid task. Task %s does not have a definition", + workflowTask.getName())); + + // when + permissiveTaskMapper.getMappedTasks(taskMapperContext); + } +} diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java new file mode 100644 index 0000000000..f767f8ea5b --- /dev/null +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.sdk.workflow.def.tasks; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.tasks.TaskType; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; + +/* Workflow permissive task executed by a worker */ +public class PermissiveTask extends Task { + + private TaskDef taskDef; + + public PermissiveTask(String taskDefName, String taskReferenceName) { + super(taskReferenceName, TaskType.PERMISSIVE); + super.name(taskDefName); + } + + PermissiveTask(WorkflowTask workflowTask) { + super(workflowTask); + this.taskDef = workflowTask.getTaskDefinition(); + } + + public TaskDef getTaskDef() { + return taskDef; + } + + public PermissiveTask setTaskDef(TaskDef taskDef) { + this.taskDef = taskDef; + return this; + } + + @Override + protected void updateWorkflowTask(WorkflowTask workflowTask) { + workflowTask.setTaskDefinition(taskDef); + } +} diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index 34fe023d5b..6601f1a23c 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -72,6 +72,7 @@ public static void initTaskImplementations() { TaskRegistry.register(TaskType.DYNAMIC.name(), Dynamic.class); TaskRegistry.register(TaskType.FORK_JOIN_DYNAMIC.name(), DynamicFork.class); TaskRegistry.register(TaskType.FORK_JOIN.name(), ForkJoin.class); + TaskRegistry.register(TaskType.PERMISSIVE.name(), PermissiveTask.class); TaskRegistry.register(TaskType.HTTP.name(), Http.class); TaskRegistry.register(TaskType.INLINE.name(), Javascript.class); TaskRegistry.register(TaskType.JOIN.name(), Join.class); @@ -238,7 +239,7 @@ public TaskClient getTaskClient() { return taskClient; } - public WorkflowClient getWorkflowClient() { + public WorkflowClient getWorkflowClient() { return workflowClient; } } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index 437f4e1ccc..1aa118696d 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -16,11 +16,11 @@ import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef +import com.netflix.conductor.common.metadata.tasks.TaskType import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow -import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification import spock.lang.Shared @@ -45,6 +45,9 @@ class ForkJoinSpec extends AbstractSpecification { @Shared def FORK_JOIN_SUB_WORKFLOW = 'integration_test_fork_join_sw' + @Shared + def FORK_JOIN_PERMISSIVE_WF = 'FanInOutPermissiveTest' + @Autowired SubWorkflow subWorkflowTask @@ -56,7 +59,8 @@ class ForkJoinSpec extends AbstractSpecification { 'nested_fork_join_with_sub_workflow_integration_test.json', 'simple_one_task_sub_workflow_integration_test.json', 'fork_join_with_optional_sub_workflow_forks_integration_test.json', - 'fork_join_sub_workflow.json' + 'fork_join_sub_workflow.json', + 'fork_join_permissive_integration_test.json', ) } @@ -251,6 +255,110 @@ class ForkJoinSpec extends AbstractSpecification { metadataService.updateTaskDef(persistedIntegrationTask2Definition) } + /** + * start + * | + * fork + * / \ + * p_task1 p_task2 + * | / + * \ / + * \ / + * join + * | + * s_task3 + * | + * End + */ + def "Test a simple workflow with fork join permissive failure flow"() { + setup: "Ensure that 'integration_task_1' has a retry count of 0" + def persistedIntegrationTask1Definition = workflowTestUtil.getPersistedTaskDefinition('integration_task_1').get() + def modifiedIntegrationTask1Definition = new TaskDef(persistedIntegrationTask1Definition.name, + persistedIntegrationTask1Definition.description, persistedIntegrationTask1Definition.ownerEmail, 0, + 0, persistedIntegrationTask1Definition.responseTimeoutSeconds) + metadataService.updateTaskDef(modifiedIntegrationTask1Definition) + + when: "A fork join workflow is started" + def workflowInstanceId = startWorkflow(FORK_JOIN_PERMISSIVE_WF, 1, + 'fanoutTest', [:], + null) + + then: "verify that the workflow has started and the starting nodes of the each fork are in scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].workflowTask.type == TaskType.PERMISSIVE.name() + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'integration_task_1' + tasks[2].workflowTask.type == TaskType.PERMISSIVE.name() + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The first task of the fork is polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").getTaskId() + def polledAndAckTask1Try1 = workflowTestUtil.pollAndFailTask('integration_task_1', 'task1.worker', 'Failed...') + + then: "verify that the 'integration_task_1' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask1Try1) + + and: "The workflow has been updated and has all the required tasks in the right status to move forward" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The other node of the fork is completed by completing 'integration_task_2'" + def polledAndAckTask2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2','task1.worker') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_task_2' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) + + and: "the workflow is in the failed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "JOIN task executed by the async executor" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "The workflow has been updated with the task status and task list" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.FAILED + tasks[3].taskType == 'JOIN' + } + + cleanup: "Restore the task definitions that were modified as part of this feature testing" + metadataService.updateTaskDef(persistedIntegrationTask1Definition) + } + def "Test retrying a failed fork join workflow"() { when: "A fork join workflow is started" diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy index 0fbf0ec0b3..705836f7f1 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy @@ -45,6 +45,12 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { @Shared def WORKFLOW_WITH_OPTIONAL_TASK = 'optional_task_wf' + @Shared + def WORKFLOW_WITH_PERMISSIVE_TASK = 'permissive_task_wf' + + @Shared + def WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK = 'permissive_optional_task_wf' + @Shared def TEST_WORKFLOW = 'integration_test_wf3' @@ -52,12 +58,14 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { def WAIT_TIME_OUT_WORKFLOW = 'test_wait_timeout' def setup() { - //Register LINEAR_WORKFLOW_T1_T2, TEST_WORKFLOW, RTOWF, WORKFLOW_WITH_OPTIONAL_TASK + //Register LINEAR_WORKFLOW_T1_T2, TEST_WORKFLOW, RTOWF, WORKFLOW_WITH_OPTIONAL_TASK, WORKFLOW_WITH_PERMISSIVE_TASK, WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK workflowTestUtil.registerWorkflows( 'simple_workflow_1_integration_test.json', 'simple_workflow_1_input_template_integration_test.json', 'simple_workflow_3_integration_test.json', 'simple_workflow_with_optional_task_integration_test.json', + 'simple_workflow_with_permissive_task_integration_test.json', + 'simple_workflow_with_permissive_optional_task_integration_test.json', 'simple_wait_task_workflow_integration_test.json') } @@ -133,6 +141,156 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { } } + def "Test simple workflow which has a permissive task"() { + + given: "A input parameters for a workflow with a permissive task" + def correlationId = 'integration_test' + UUID.randomUUID().toString() + def workflowInput = new HashMap() + workflowInput['param1'] = 'p1 value' + workflowInput['param2'] = 'p2 value' + + when: "A permissive task workflow is started" + def workflowInstanceId = startWorkflow(WORKFLOW_WITH_PERMISSIVE_TASK, 1, + correlationId, workflowInput, + null) + + then: "verify that the workflow has started and the permissive task is in a scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].status == Task.Status.SCHEDULED + tasks[0].taskType == 'task_permissive' + } + + when: "The first permissive task is polled and failed" + Tuple polledAndFailedTaskTry1 = workflowTestUtil.pollAndFailTask('task_permissive', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_permissive was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry1) + + when: "A decide is executed on the workflow" + workflowExecutor.decide(workflowInstanceId) + + then: "verify that the workflow is still running and the first permissive task has failed and the retry has kicked in" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 2 + tasks[0].status == Task.Status.FAILED + tasks[0].taskType == 'task_permissive' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'task_permissive' + } + + when: "The first permissive task is polled and failed" + Tuple polledAndFailedTaskTry2 = workflowTestUtil.pollAndFailTask('task_permissive', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_permissive was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry2) + + workflowExecutor.decide(workflowInstanceId) + + then: "Ensure that the workflow is updated" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 3 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'task_permissive' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + } + + when: "The second task 'integration_task_2' is polled and completed" + def task2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker') + + then: "Verify that the task was polled and acknowledged" + verifyPolledAndAcknowledgedTask(task2Try1) + + and: "Ensure that the workflow is in completed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + reasonForIncompletion == "Task ${tasks[0].taskId} failed with status: FAILED and reason: 'NETWORK ERROR'. " + + "Task ${tasks[1].taskId} failed with status: FAILED and reason: 'NETWORK ERROR'" + tasks.size() == 3 + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + } + } + + def "Test simple workflow which has a permissive optional task"() { + + given: "A input parameters for a workflow with a permissive optional task" + def correlationId = 'integration_test' + UUID.randomUUID().toString() + def workflowInput = new HashMap() + workflowInput['param1'] = 'p1 value' + workflowInput['param2'] = 'p2 value' + + when: "A permissive optional task workflow is started" + def workflowInstanceId = startWorkflow(WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK, 1, + correlationId, workflowInput, + null) + + then: "verify that the workflow has started and the permissive optional task is in a scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].status == Task.Status.SCHEDULED + tasks[0].taskType == 'task_optional' + } + + when: "The first permissive optional task is polled and failed" + Tuple polledAndFailedTaskTry1 = workflowTestUtil.pollAndFailTask('task_optional', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_optional was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry1) + + when: "A decide is executed on the workflow" + workflowExecutor.decide(workflowInstanceId) + + then: "verify that the workflow is still running and the first permissive optional task has failed and the retry has kicked in" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 2 + tasks[0].status == Task.Status.FAILED + tasks[0].taskType == 'task_optional' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'task_optional' + } + + when: "Poll the permissive optional task again and do not complete it and run decide" + workflowExecutionService.poll('task_optional', 'task1.integration.worker') + Thread.sleep(5000) + workflowExecutor.decide(workflowInstanceId) + + then: "Ensure that the workflow is updated" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 3 + tasks[1].status == Task.Status.COMPLETED_WITH_ERRORS + tasks[1].taskType == 'task_optional' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + } + + when: "The second task 'integration_task_2' is polled and completed" + def task2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker') + + then: "Verify that the task was polled and acknowledged" + verifyPolledAndAcknowledgedTask(task2Try1) + + and: "Ensure that the workflow is in completed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 3 + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + } + } + def "test workflow with input template parsing"() { given: "Input parameters for a workflow with input template" def correlationId = 'integration_test' + UUID.randomUUID().toString() diff --git a/test-harness/src/test/resources/fork_join_permissive_integration_test.json b/test-harness/src/test/resources/fork_join_permissive_integration_test.json new file mode 100644 index 0000000000..6f65d4e118 --- /dev/null +++ b/test-harness/src/test/resources/fork_join_permissive_integration_test.json @@ -0,0 +1,109 @@ +{ + "name": "FanInOutPermissiveTest", + "description": "FanInOutPermissiveTest", + "version": 1, + "tasks": [ + { + "name": "fork", + "taskReferenceName": "fanouttask", + "inputParameters": {}, + "type": "FORK_JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [ + [ + { + "name": "integration_task_1", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + [ + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ] + ], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "join", + "taskReferenceName": "fanouttask_join", + "inputParameters": {}, + "type": "JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [ + "t1", + "t2" + ], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_3", + "taskReferenceName": "t3", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": {}, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json b/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json new file mode 100644 index 0000000000..84c6910abf --- /dev/null +++ b/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json @@ -0,0 +1,58 @@ +{ + "name": "permissive_optional_task_wf", + "description": "permissive_optional_task_wf", + "version": 1, + "tasks": [ + { + "name": "task_optional", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "${workflow.input.param1}", + "p2": "${workflow.input.param2}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "${workflow.input.param1}", + "tp2": "${t1.output.op}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": { + "o1": "${workflow.input.param1}", + "o2": "${t2.output.uuid}", + "o3": "${t1.output.op}" + }, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json b/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json new file mode 100644 index 0000000000..2b8f4dbfe0 --- /dev/null +++ b/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json @@ -0,0 +1,92 @@ +{ + "name": "permissive_task_wf", + "description": "permissive_task_wf", + "version": 1, + "tasks": [ + { + "name": "task_permissive", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "${workflow.input.param1}", + "p2": "${workflow.input.param2}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "retryCount": 1, + "taskDefinition": { + "createdBy": "integration_app", + "name": "task_permissive", + "description": "task_permissive", + "retryCount": 1, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + }, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "${workflow.input.param1}", + "tp2": "${t1.output.op}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "retryCount": 1, + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_task_2", + "description": "integration_task_2", + "retryCount": 1, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + }, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": { + "o1": "${workflow.input.param1}", + "o2": "${t2.output.uuid}", + "o3": "${t1.output.op}" + }, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file