Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Adding resiliency tests and relevant changes for QueueDAO. #1921

Merged
merged 2 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,16 @@ public void rewind(String workflowId, boolean useLatestDefinitions) {
workflow.setStatus(WorkflowStatus.RUNNING);
workflow.setOutput(null);
workflow.setExternalOutputPayloadStoragePath(null);
executionDAOFacade.createWorkflow(workflow);

try {
executionDAOFacade.createWorkflow(workflow);
} catch (Exception e) {
Monitors.recordWorkflowStartError(workflowDef.getName(), WorkflowContext.get().getClientApp());
LOGGER.error("Unable to restart workflow: {}", workflowDef.getName(), e);
terminateWorkflow(workflowId, "Error when restarting the workflow");
throw e;
}

decide(workflowId);

if (StringUtils.isNotEmpty(workflow.getParentWorkflowId())) {
Expand Down Expand Up @@ -862,40 +871,52 @@ public void updateTask(TaskResult taskResult) {
task.setEndTime(System.currentTimeMillis());
}

// Fails the workflow if any of the below operations fail.
// This helps avoid workflow inconsistencies. For example, for the taskResult with status:COMPLETED,
// if update task to primary data store is successful, but remove from queue fails,
// The decide wouldn't run and next task will not be scheduled.
// TODO Try to recover the workflow.
try {
String updateTaskQueueDesc = "Updating Task queues for taskId: " + task.getTaskId();
String taskQueueOperation = "updateTaskQueues";
String updateTaskDesc = "Updating Task with taskId: " + task.getTaskId();
String updateTaskOperation = "updateTask";
// Update message in Task queue based on Task status
switch (task.getStatus()) {
case COMPLETED:
case CANCELED:
case FAILED:
case FAILED_WITH_TERMINAL_ERROR:
case TIMED_OUT:
try {
queueDAO.remove(taskQueueName, taskResult.getTaskId());
LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, taskQueueName, task.getStatus().name());
} catch (Exception e) {
// Ignore exceptions on queue remove as it wouldn't impact task and workflow execution, and will be cleaned up eventually
String errorMsg = String.format("Error removing the message in queue for task: %s for workflow: %s", task.getTaskId(), workflowId);
LOGGER.warn(errorMsg, e);
Monitors.recordTaskQueueOpError(task.getTaskType(), workflowInstance.getWorkflowName());
}
break;
case IN_PROGRESS:
case SCHEDULED:
try {
String postponeTaskMessageDesc = "Postponing Task message in queue for taskId: " + task.getTaskId();
String postponeTaskMessageOperation = "postponeTaskMessage";

// Retry each operation twice before failing workflow.
new RetryUtil<>().retryOnException(() -> {
switch (task.getStatus()) {
case COMPLETED:
case CANCELED:
case FAILED:
case FAILED_WITH_TERMINAL_ERROR:
case TIMED_OUT:
queueDAO.remove(taskQueueName, taskResult.getTaskId());
LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, taskQueueName, task.getStatus().name());
break;
case IN_PROGRESS:
case SCHEDULED:
new RetryUtil<>().retryOnException(() -> {
// postpone based on callbackAfterSeconds
long callBack = taskResult.getCallbackAfterSeconds();
queueDAO.postpone(taskQueueName, task.getTaskId(), task.getWorkflowPriority(), callBack);
LOGGER.debug("Task: {} postponed in taskQueue: {} since the task status is {} with callbackAfterSeconds: {}", task, taskQueueName, task.getStatus().name(), callBack);
break;
default:
break;
return null;
}, null, null, 2, postponeTaskMessageDesc, postponeTaskMessageOperation);
} catch (Exception e) {
// Throw exceptions on queue postpone, this would impact task execution
String errorMsg = String.format("Error postponing the message in queue for task: %s for workflow: %s", task.getTaskId(), workflowId);
LOGGER.error(errorMsg, e);
Monitors.recordTaskQueueOpError(task.getTaskType(), workflowInstance.getWorkflowName());
throw new ApplicationException(Code.BACKEND_ERROR, e);
}
return null;
}, null, null, 2, updateTaskQueueDesc, taskQueueOperation);
break;
default:
break;
}

// Throw an ApplicationException if below operations fail to avoid workflow inconsistencies.
try {
String updateTaskDesc = "Updating Task with taskId: " + task.getTaskId();
String updateTaskOperation = "updateTask";

new RetryUtil<>().retryOnException(() -> {
executionDAOFacade.updateTask(task);
Expand Down Expand Up @@ -1044,15 +1065,6 @@ public boolean decide(String workflowId) {
}
}

if (!outcome.tasksToBeUpdated.isEmpty()) {
for (Task task : tasksToBeUpdated) {
if (task.getStatus() != null && (!task.getStatus().equals(Task.Status.IN_PROGRESS)
|| !task.getStatus().equals(Task.Status.SCHEDULED))) {
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
}
}
}

if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) {
executionDAOFacade.updateTasks(tasksToBeUpdated);
executionDAOFacade.updateWorkflow(workflow);
Expand Down Expand Up @@ -1539,6 +1551,7 @@ private boolean rerunWF(String workflowId, String taskId, Map<String, Object> ta
workflow.setInput(workflowInput);
}

queueDAO.push(DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), config.getSweepFrequency());
executionDAOFacade.updateWorkflow(workflow);

decide(workflowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ public static void recordTaskUpdateError(String taskType, String workflowType) {
counter(classQualifier, "task_update_error", "workflowName", workflowType, "taskType", taskType);
}

public static void recordTaskQueueOpError(String taskType, String workflowType) {
counter(classQualifier, "task_queue_op_error", "workflowName", workflowType, "taskType", taskType);
}

public static void recordWorkflowCompletion(String workflowType, long duration, String ownerApp) {
getTimer(classQualifier, "workflow_execution", "workflowName", workflowType, "ownerApp", ""+ownerApp).record(duration, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public List<PollData> getAllPollData() {
return taskService.getAllPollData();
}

@Deprecated
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@POST
@Path("/queue/requeue")
@ApiOperation("Requeue pending tasks for all the running workflows")
Expand Down
Loading