Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Adding resiliency tests and relevant changes for QueueDAO. (#1921)
Browse files Browse the repository at this point in the history
* Adding resiliency tests and relevant changes for QueueDAO.

* Improvements to QueueDAO resiliency changes to verify the case when update task is called with IN_PROGRESS state.
  • Loading branch information
kishorebanala authored Oct 14, 2020
1 parent fc533a6 commit ddafc5a
Show file tree
Hide file tree
Showing 5 changed files with 664 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,16 @@ public void rewind(String workflowId, boolean useLatestDefinitions) {
workflow.setStatus(WorkflowStatus.RUNNING);
workflow.setOutput(null);
workflow.setExternalOutputPayloadStoragePath(null);
executionDAOFacade.createWorkflow(workflow);

try {
executionDAOFacade.createWorkflow(workflow);
} catch (Exception e) {
Monitors.recordWorkflowStartError(workflowDef.getName(), WorkflowContext.get().getClientApp());
LOGGER.error("Unable to restart workflow: {}", workflowDef.getName(), e);
terminateWorkflow(workflowId, "Error when restarting the workflow");
throw e;
}

decide(workflowId);

if (StringUtils.isNotEmpty(workflow.getParentWorkflowId())) {
Expand Down Expand Up @@ -862,40 +871,52 @@ public void updateTask(TaskResult taskResult) {
task.setEndTime(System.currentTimeMillis());
}

// Fails the workflow if any of the below operations fail.
// This helps avoid workflow inconsistencies. For example, for the taskResult with status:COMPLETED,
// if update task to primary data store is successful, but remove from queue fails,
// The decide wouldn't run and next task will not be scheduled.
// TODO Try to recover the workflow.
try {
String updateTaskQueueDesc = "Updating Task queues for taskId: " + task.getTaskId();
String taskQueueOperation = "updateTaskQueues";
String updateTaskDesc = "Updating Task with taskId: " + task.getTaskId();
String updateTaskOperation = "updateTask";
// Update message in Task queue based on Task status
switch (task.getStatus()) {
case COMPLETED:
case CANCELED:
case FAILED:
case FAILED_WITH_TERMINAL_ERROR:
case TIMED_OUT:
try {
queueDAO.remove(taskQueueName, taskResult.getTaskId());
LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, taskQueueName, task.getStatus().name());
} catch (Exception e) {
// Ignore exceptions on queue remove as it wouldn't impact task and workflow execution, and will be cleaned up eventually
String errorMsg = String.format("Error removing the message in queue for task: %s for workflow: %s", task.getTaskId(), workflowId);
LOGGER.warn(errorMsg, e);
Monitors.recordTaskQueueOpError(task.getTaskType(), workflowInstance.getWorkflowName());
}
break;
case IN_PROGRESS:
case SCHEDULED:
try {
String postponeTaskMessageDesc = "Postponing Task message in queue for taskId: " + task.getTaskId();
String postponeTaskMessageOperation = "postponeTaskMessage";

// Retry each operation twice before failing workflow.
new RetryUtil<>().retryOnException(() -> {
switch (task.getStatus()) {
case COMPLETED:
case CANCELED:
case FAILED:
case FAILED_WITH_TERMINAL_ERROR:
case TIMED_OUT:
queueDAO.remove(taskQueueName, taskResult.getTaskId());
LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, taskQueueName, task.getStatus().name());
break;
case IN_PROGRESS:
case SCHEDULED:
new RetryUtil<>().retryOnException(() -> {
// postpone based on callbackAfterSeconds
long callBack = taskResult.getCallbackAfterSeconds();
queueDAO.postpone(taskQueueName, task.getTaskId(), task.getWorkflowPriority(), callBack);
LOGGER.debug("Task: {} postponed in taskQueue: {} since the task status is {} with callbackAfterSeconds: {}", task, taskQueueName, task.getStatus().name(), callBack);
break;
default:
break;
return null;
}, null, null, 2, postponeTaskMessageDesc, postponeTaskMessageOperation);
} catch (Exception e) {
// Throw exceptions on queue postpone, this would impact task execution
String errorMsg = String.format("Error postponing the message in queue for task: %s for workflow: %s", task.getTaskId(), workflowId);
LOGGER.error(errorMsg, e);
Monitors.recordTaskQueueOpError(task.getTaskType(), workflowInstance.getWorkflowName());
throw new ApplicationException(Code.BACKEND_ERROR, e);
}
return null;
}, null, null, 2, updateTaskQueueDesc, taskQueueOperation);
break;
default:
break;
}

// Throw an ApplicationException if below operations fail to avoid workflow inconsistencies.
try {
String updateTaskDesc = "Updating Task with taskId: " + task.getTaskId();
String updateTaskOperation = "updateTask";

new RetryUtil<>().retryOnException(() -> {
executionDAOFacade.updateTask(task);
Expand Down Expand Up @@ -1044,15 +1065,6 @@ public boolean decide(String workflowId) {
}
}

if (!outcome.tasksToBeUpdated.isEmpty()) {
for (Task task : tasksToBeUpdated) {
if (task.getStatus() != null && (!task.getStatus().equals(Task.Status.IN_PROGRESS)
|| !task.getStatus().equals(Task.Status.SCHEDULED))) {
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
}
}
}

if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) {
executionDAOFacade.updateTasks(tasksToBeUpdated);
executionDAOFacade.updateWorkflow(workflow);
Expand Down Expand Up @@ -1539,6 +1551,7 @@ private boolean rerunWF(String workflowId, String taskId, Map<String, Object> ta
workflow.setInput(workflowInput);
}

queueDAO.push(DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), config.getSweepFrequency());
executionDAOFacade.updateWorkflow(workflow);

decide(workflowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ public static void recordTaskUpdateError(String taskType, String workflowType) {
counter(classQualifier, "task_update_error", "workflowName", workflowType, "taskType", taskType);
}

public static void recordTaskQueueOpError(String taskType, String workflowType) {
counter(classQualifier, "task_queue_op_error", "workflowName", workflowType, "taskType", taskType);
}

public static void recordWorkflowCompletion(String workflowType, long duration, String ownerApp) {
getTimer(classQualifier, "workflow_execution", "workflowName", workflowType, "ownerApp", ""+ownerApp).record(duration, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public List<PollData> getAllPollData() {
return taskService.getAllPollData();
}

@Deprecated
@POST
@Path("/queue/requeue")
@ApiOperation("Requeue pending tasks for all the running workflows")
Expand Down
Loading

0 comments on commit ddafc5a

Please sign in to comment.