Skip to content

Commit

Permalink
Merge pull request #13 from routific/event-queue-retry
Browse files Browse the repository at this point in the history
Allow retry option in fail-task event messages
  • Loading branch information
pmchung authored Oct 22, 2022
2 parents 59a6e1c + 5622758 commit 09f858b
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public enum Status {
@ProtoField(id = 1)
private String workflowInstanceId;

@NotEmpty(message = "Task ID cannot be null or empty")
@ProtoField(id = 2)
private String taskId;

Expand All @@ -67,6 +66,9 @@ public enum Status {
@Hidden
private Any outputMessage;

@ProtoField(id = 9)
private String taskReferenceName;

private List<TaskExecLog> logs = new CopyOnWriteArrayList<>();

private String externalOutputPayloadStoragePath;
Expand Down Expand Up @@ -119,6 +121,14 @@ public void setTaskId(String taskId) {
this.taskId = taskId;
}

public String getTaskReferenceName() {
return taskReferenceName;
}

public void setTaskReferenceName(String taskReferenceName) {
this.taskReferenceName = taskReferenceName;
}

public String getReasonForIncompletion() {
return reasonForIncompletion;
}
Expand Down Expand Up @@ -263,6 +273,9 @@ public String toString() {
+ ", taskId='"
+ taskId
+ '\''
+ ", taskReferenceName='"
+ taskReferenceName
+ '\''
+ ", reasonForIncompletion='"
+ reasonForIncompletion
+ '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

public class TaskUtils {

private static final String LOOP_TASK_DELIMITER = "__";
public static final String LOOP_TASK_DELIMITER = "__";

public static String appendIteration(String name, int iteration) {
return name + LOOP_TASK_DELIMITER + iteration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Map<String, Object> execute(
action,
jsonObject,
action.getFail_task(),
TaskModel.Status.FAILED,
TaskModel.Status.FAILED_WITH_TERMINAL_ERROR,
event,
messageId);
default:
Expand All @@ -110,12 +110,14 @@ private Map<String, Object> completeTask(
input.put("workflowId", taskDetails.getWorkflowId());
input.put("taskId", taskDetails.getTaskId());
input.put("taskRefName", taskDetails.getTaskRefName());
input.put("retry", false);
input.putAll(taskDetails.getOutput());

Map<String, Object> replaced = parametersUtils.replace(input, payload);
String workflowId = (String) replaced.get("workflowId");
String taskId = (String) replaced.get("taskId");
String taskRefName = (String) replaced.get("taskRefName");
Boolean retry = Boolean.TRUE.equals(replaced.get("retry"));

TaskModel taskModel = null;
if (StringUtils.isNotEmpty(taskId)) {
Expand All @@ -132,9 +134,11 @@ private Map<String, Object> completeTask(
workflow.getTasks().stream()
.filter(
t ->
TaskUtils.removeIterationFromTaskRefName(
t.getReferenceTaskName())
.equals(taskRefName))
t.getReferenceTaskName()
.contains(TaskUtils.LOOP_TASK_DELIMITER)
&& TaskUtils.removeIterationFromTaskRefName(
t.getReferenceTaskName())
.equals(taskRefName))
.collect(Collectors.toList());
if (!loopOverTaskList.isEmpty()) {
// Find loopover task with the highest iteration value
Expand All @@ -158,6 +162,10 @@ private Map<String, Object> completeTask(
return replaced;
}

if (retry) {
status = TaskModel.Status.FAILED;
}

taskModel.setStatus(status);
taskModel.setOutputData(replaced);
taskModel.setOutputMessage(taskDetails.getOutputMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,12 +507,15 @@ Optional<TaskModel> retry(
taskDefinition = metadataDAO.getTaskDef(task.getTaskDefName());
}

final int expectedRetryCount =
taskDefinition == null
int expectedRetryCount =
workflowTask == null
? 0
: Optional.ofNullable(workflowTask)
.map(WorkflowTask::getRetryCount)
.orElse(taskDefinition.getRetryCount());
: Optional.ofNullable(workflowTask.getRetryCount()).orElse(0);
expectedRetryCount =
taskDefinition == null
? expectedRetryCount
: Optional.ofNullable(taskDefinition.getRetryCount()).orElse(0);

if (!task.getStatus().isRetriable()
|| TaskType.isBuiltIn(task.getTaskType())
|| expectedRetryCount <= retryCount) {
Expand All @@ -534,32 +537,36 @@ Optional<TaskModel> retry(
updateWorkflowOutput(workflow, task);
throw new TerminateWorkflowException(task.getReasonForIncompletion(), status, task);
}

// retry... - but not immediately - put a delay...
int startDelay = taskDefinition.getRetryDelaySeconds();
switch (taskDefinition.getRetryLogic()) {
case FIXED:
startDelay = taskDefinition.getRetryDelaySeconds();
break;
case LINEAR_BACKOFF:
int linearRetryDelaySeconds =
taskDefinition.getRetryDelaySeconds()
* taskDefinition.getBackoffScaleFactor()
* (task.getRetryCount() + 1);
// Reset integer overflow to max value
startDelay =
linearRetryDelaySeconds < 0 ? Integer.MAX_VALUE : linearRetryDelaySeconds;
break;
case EXPONENTIAL_BACKOFF:
int exponentialRetryDelaySeconds =
taskDefinition.getRetryDelaySeconds()
* (int) Math.pow(2, task.getRetryCount());
// Reset integer overflow to max value
startDelay =
exponentialRetryDelaySeconds < 0
? Integer.MAX_VALUE
: exponentialRetryDelaySeconds;
break;
int startDelay = 1;
if (!Objects.isNull(taskDefinition)) {
// retry... - but not immediately - put a delay...
startDelay = taskDefinition.getRetryDelaySeconds();
switch (taskDefinition.getRetryLogic()) {
case FIXED:
startDelay = taskDefinition.getRetryDelaySeconds();
break;
case LINEAR_BACKOFF:
int linearRetryDelaySeconds =
taskDefinition.getRetryDelaySeconds()
* taskDefinition.getBackoffScaleFactor()
* (task.getRetryCount() + 1);
// Reset integer overflow to max value
startDelay =
linearRetryDelaySeconds < 0
? Integer.MAX_VALUE
: linearRetryDelaySeconds;
break;
case EXPONENTIAL_BACKOFF:
int exponentialRetryDelaySeconds =
taskDefinition.getRetryDelaySeconds()
* (int) Math.pow(2, task.getRetryCount());
// Reset integer overflow to max value
startDelay =
exponentialRetryDelaySeconds < 0
? Integer.MAX_VALUE
: exponentialRetryDelaySeconds;
break;
}
}

task.setRetried(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,15 +1093,28 @@ public void updateTask(TaskResult taskResult) {
workflowInstance =
metadataMapperService.populateWorkflowWithDefinitions(workflowInstance);
}

TaskModel task =
Optional.ofNullable(executionDAOFacade.getTaskModel(taskResult.getTaskId()))
.orElseThrow(
() ->
new ApplicationException(
ApplicationException.Code.NOT_FOUND,
"No such task found by id: "
+ taskResult.getTaskId()));
TaskModel task;
if (taskResult.getTaskReferenceName() != null && taskResult.getTaskId() == null) {
task =
Optional.ofNullable(
workflowInstance.getTaskByRefName(
taskResult.getTaskReferenceName()))
.orElseThrow(
() ->
new ApplicationException(
ApplicationException.Code.NOT_FOUND,
"No such task found by reference name: "
+ taskResult.getTaskReferenceName()));
} else {
task =
Optional.ofNullable(executionDAOFacade.getTaskModel(taskResult.getTaskId()))
.orElseThrow(
() ->
new ApplicationException(
ApplicationException.Code.NOT_FOUND,
"No such task found by id: "
+ taskResult.getTaskId()));
}

LOGGER.debug("Task: {} belonging to Workflow {} being updated", task, workflowInstance);

Expand Down Expand Up @@ -1335,7 +1348,10 @@ public boolean decide(String workflowId) {
decide(workflowId);
}
} catch (TerminateWorkflowException twe) {
LOGGER.info("Execution terminated of workflow: {}", workflowId, twe);
LOGGER.info(
"Execution terminated of workflow: {} with reason: {}",
workflowId,
twe.getMessage());
terminate(workflow, twe);
return true;
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,9 @@ public TaskResultPb.TaskResult toProto(TaskResult from) {
if (from.getOutputMessage() != null) {
to.setOutputMessage( toProto( from.getOutputMessage() ) );
}
if (from.getTaskReferenceName() != null) {
to.setTaskReferenceName( from.getTaskReferenceName() );
}
return to.build();
}

Expand All @@ -852,6 +855,7 @@ public TaskResult fromProto(TaskResultPb.TaskResult from) {
if (from.hasOutputMessage()) {
to.setOutputMessage( fromProto( from.getOutputMessage() ) );
}
to.setTaskReferenceName( from.getTaskReferenceName() );
return to;
}

Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/taskresult.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ message TaskResult {
TaskResult.Status status = 6;
map<string, google.protobuf.Value> output_data = 7;
google.protobuf.Any output_message = 8;
string task_reference_name = 9;
}

0 comments on commit 09f858b

Please sign in to comment.