From f1339da920d45c8d3e07998be0387acf8eea8d96 Mon Sep 17 00:00:00 2001 From: pmchung Date: Fri, 21 Oct 2022 09:22:47 -0700 Subject: [PATCH 1/4] Allow taskReferenceName over taskId in update task API --- .../common/metadata/tasks/TaskResult.java | 15 +++++++- .../core/execution/WorkflowExecutor.java | 36 +++++++++++++------ .../conductor/grpc/AbstractProtoMapper.java | 4 +++ grpc/src/main/proto/model/taskresult.proto | 1 + 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java index d1628ea616..e961921493 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java @@ -44,7 +44,6 @@ public enum Status { @ProtoField(id = 1) private String workflowInstanceId; - @NotEmpty(message = "Task ID cannot be null or empty") @ProtoField(id = 2) private String taskId; @@ -67,6 +66,9 @@ public enum Status { @Hidden private Any outputMessage; + @ProtoField(id = 9) + private String taskReferenceName; + private List logs = new CopyOnWriteArrayList<>(); private String externalOutputPayloadStoragePath; @@ -119,6 +121,14 @@ public void setTaskId(String taskId) { this.taskId = taskId; } + public String getTaskReferenceName() { + return taskReferenceName; + } + + public void setTaskReferenceName(String taskReferenceName) { + this.taskReferenceName = taskReferenceName; + } + public String getReasonForIncompletion() { return reasonForIncompletion; } @@ -263,6 +273,9 @@ public String toString() { + ", taskId='" + taskId + '\'' + + ", taskReferenceName='" + + taskReferenceName + + '\'' + ", reasonForIncompletion='" + reasonForIncompletion + '\'' diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index d8aeb764bc..c1fafc2117 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -1093,15 +1093,28 @@ public void updateTask(TaskResult taskResult) { workflowInstance = metadataMapperService.populateWorkflowWithDefinitions(workflowInstance); } - - TaskModel task = - Optional.ofNullable(executionDAOFacade.getTaskModel(taskResult.getTaskId())) - .orElseThrow( - () -> - new ApplicationException( - ApplicationException.Code.NOT_FOUND, - "No such task found by id: " - + taskResult.getTaskId())); + TaskModel task; + if (taskResult.getTaskReferenceName() != null && taskResult.getTaskId() == null) { + task = + Optional.ofNullable( + workflowInstance.getTaskByRefName( + taskResult.getTaskReferenceName())) + .orElseThrow( + () -> + new ApplicationException( + ApplicationException.Code.NOT_FOUND, + "No such task found by reference name: " + + taskResult.getTaskReferenceName())); + } else { + task = + Optional.ofNullable(executionDAOFacade.getTaskModel(taskResult.getTaskId())) + .orElseThrow( + () -> + new ApplicationException( + ApplicationException.Code.NOT_FOUND, + "No such task found by id: " + + taskResult.getTaskId())); + } LOGGER.debug("Task: {} belonging to Workflow {} being updated", task, workflowInstance); @@ -1335,7 +1348,10 @@ public boolean decide(String workflowId) { decide(workflowId); } } catch (TerminateWorkflowException twe) { - LOGGER.info("Execution terminated of workflow: {}", workflowId, twe); + LOGGER.info( + "Execution terminated of workflow: {} with reason: {}", + workflowId, + twe.getMessage()); terminate(workflow, twe); return true; } catch (RuntimeException e) { diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index 4a6fbfff30..15547433fe 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -833,6 +833,9 @@ public TaskResultPb.TaskResult toProto(TaskResult from) { if (from.getOutputMessage() != null) { to.setOutputMessage( toProto( from.getOutputMessage() ) ); } + if (from.getTaskReferenceName() != null) { + to.setTaskReferenceName( from.getTaskReferenceName() ); + } return to.build(); } @@ -852,6 +855,7 @@ public TaskResult fromProto(TaskResultPb.TaskResult from) { if (from.hasOutputMessage()) { to.setOutputMessage( fromProto( from.getOutputMessage() ) ); } + to.setTaskReferenceName( from.getTaskReferenceName() ); return to; } diff --git a/grpc/src/main/proto/model/taskresult.proto b/grpc/src/main/proto/model/taskresult.proto index bb7f4a23ec..e434e691c8 100644 --- a/grpc/src/main/proto/model/taskresult.proto +++ b/grpc/src/main/proto/model/taskresult.proto @@ -23,4 +23,5 @@ message TaskResult { TaskResult.Status status = 6; map output_data = 7; google.protobuf.Any output_message = 8; + string task_reference_name = 9; } From 5c4cadad4d3122f929efb1be92448edd1ecef6bc Mon Sep 17 00:00:00 2001 From: pmchung Date: Fri, 21 Oct 2022 09:23:26 -0700 Subject: [PATCH 2/4] Use default retry logic if task retryCount > 0 --- .../core/execution/DeciderService.java | 69 ++++++++++--------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index 18bda39189..187ba706e2 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -507,12 +507,15 @@ Optional retry( taskDefinition = metadataDAO.getTaskDef(task.getTaskDefName()); } - final int expectedRetryCount = - taskDefinition == null + int expectedRetryCount = + workflowTask == null ? 0 - : Optional.ofNullable(workflowTask) - .map(WorkflowTask::getRetryCount) - .orElse(taskDefinition.getRetryCount()); + : Optional.ofNullable(workflowTask.getRetryCount()).orElse(0); + expectedRetryCount = + taskDefinition == null + ? expectedRetryCount + : Optional.ofNullable(taskDefinition.getRetryCount()).orElse(0); + if (!task.getStatus().isRetriable() || TaskType.isBuiltIn(task.getTaskType()) || expectedRetryCount <= retryCount) { @@ -534,32 +537,36 @@ Optional retry( updateWorkflowOutput(workflow, task); throw new TerminateWorkflowException(task.getReasonForIncompletion(), status, task); } - - // retry... - but not immediately - put a delay... - int startDelay = taskDefinition.getRetryDelaySeconds(); - switch (taskDefinition.getRetryLogic()) { - case FIXED: - startDelay = taskDefinition.getRetryDelaySeconds(); - break; - case LINEAR_BACKOFF: - int linearRetryDelaySeconds = - taskDefinition.getRetryDelaySeconds() - * taskDefinition.getBackoffScaleFactor() - * (task.getRetryCount() + 1); - // Reset integer overflow to max value - startDelay = - linearRetryDelaySeconds < 0 ? Integer.MAX_VALUE : linearRetryDelaySeconds; - break; - case EXPONENTIAL_BACKOFF: - int exponentialRetryDelaySeconds = - taskDefinition.getRetryDelaySeconds() - * (int) Math.pow(2, task.getRetryCount()); - // Reset integer overflow to max value - startDelay = - exponentialRetryDelaySeconds < 0 - ? Integer.MAX_VALUE - : exponentialRetryDelaySeconds; - break; + int startDelay = 1; + if (!Objects.isNull(taskDefinition)) { + // retry... - but not immediately - put a delay... + startDelay = taskDefinition.getRetryDelaySeconds(); + switch (taskDefinition.getRetryLogic()) { + case FIXED: + startDelay = taskDefinition.getRetryDelaySeconds(); + break; + case LINEAR_BACKOFF: + int linearRetryDelaySeconds = + taskDefinition.getRetryDelaySeconds() + * taskDefinition.getBackoffScaleFactor() + * (task.getRetryCount() + 1); + // Reset integer overflow to max value + startDelay = + linearRetryDelaySeconds < 0 + ? Integer.MAX_VALUE + : linearRetryDelaySeconds; + break; + case EXPONENTIAL_BACKOFF: + int exponentialRetryDelaySeconds = + taskDefinition.getRetryDelaySeconds() + * (int) Math.pow(2, task.getRetryCount()); + // Reset integer overflow to max value + startDelay = + exponentialRetryDelaySeconds < 0 + ? Integer.MAX_VALUE + : exponentialRetryDelaySeconds; + break; + } } task.setRetried(true); From 2ff0aa411118c8ed573a91931f3b08768c9d273b Mon Sep 17 00:00:00 2001 From: pmchung Date: Fri, 21 Oct 2022 09:25:42 -0700 Subject: [PATCH 3/4] Mark task as FAILED_WITH_TERMINAL_ERROR unless retry is true --- .../conductor/core/events/SimpleActionProcessor.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java index 84db035716..8496e154e0 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java @@ -88,7 +88,7 @@ public Map execute( action, jsonObject, action.getFail_task(), - TaskModel.Status.FAILED, + TaskModel.Status.FAILED_WITH_TERMINAL_ERROR, event, messageId); default: @@ -110,12 +110,14 @@ private Map completeTask( input.put("workflowId", taskDetails.getWorkflowId()); input.put("taskId", taskDetails.getTaskId()); input.put("taskRefName", taskDetails.getTaskRefName()); + input.put("retry", false); input.putAll(taskDetails.getOutput()); Map replaced = parametersUtils.replace(input, payload); String workflowId = (String) replaced.get("workflowId"); String taskId = (String) replaced.get("taskId"); String taskRefName = (String) replaced.get("taskRefName"); + Boolean retry = Boolean.TRUE.equals(replaced.get("retry")); TaskModel taskModel = null; if (StringUtils.isNotEmpty(taskId)) { @@ -158,6 +160,10 @@ private Map completeTask( return replaced; } + if (retry) { + status = TaskModel.Status.FAILED; + } + taskModel.setStatus(status); taskModel.setOutputData(replaced); taskModel.setOutputMessage(taskDetails.getOutputMessage()); From 56227581d5fc82706698574c50d7d11c6386de8c Mon Sep 17 00:00:00 2001 From: pmchung Date: Fri, 21 Oct 2022 09:25:59 -0700 Subject: [PATCH 4/4] Fix conductor bug --- .../com/netflix/conductor/common/utils/TaskUtils.java | 2 +- .../conductor/core/events/SimpleActionProcessor.java | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/com/netflix/conductor/common/utils/TaskUtils.java b/common/src/main/java/com/netflix/conductor/common/utils/TaskUtils.java index 5e83bd73e7..da0fcab6bc 100644 --- a/common/src/main/java/com/netflix/conductor/common/utils/TaskUtils.java +++ b/common/src/main/java/com/netflix/conductor/common/utils/TaskUtils.java @@ -14,7 +14,7 @@ public class TaskUtils { - private static final String LOOP_TASK_DELIMITER = "__"; + public static final String LOOP_TASK_DELIMITER = "__"; public static String appendIteration(String name, int iteration) { return name + LOOP_TASK_DELIMITER + iteration; diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java index 8496e154e0..a344383186 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java @@ -134,9 +134,11 @@ private Map completeTask( workflow.getTasks().stream() .filter( t -> - TaskUtils.removeIterationFromTaskRefName( - t.getReferenceTaskName()) - .equals(taskRefName)) + t.getReferenceTaskName() + .contains(TaskUtils.LOOP_TASK_DELIMITER) + && TaskUtils.removeIterationFromTaskRefName( + t.getReferenceTaskName()) + .equals(taskRefName)) .collect(Collectors.toList()); if (!loopOverTaskList.isEmpty()) { // Find loopover task with the highest iteration value