Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Adding resiliency tests and relevant changes for QueueDAO. #1921

Merged
merged 2 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,23 @@ public void rewind(String workflowId, boolean useLatestDefinitions) {
workflow.setStatus(WorkflowStatus.RUNNING);
workflow.setOutput(null);
workflow.setExternalOutputPayloadStoragePath(null);
executionDAOFacade.createWorkflow(workflow);

try {
executionDAOFacade.createWorkflow(workflow);
} catch (Exception e) {
Monitors.recordWorkflowStartError(workflowDef.getName(), WorkflowContext.get().getClientApp());
LOGGER.error("Unable to restart workflow: {}", workflowDef.getName(), e);

// It's possible the terminate workflow call hits an exception as well, in that case we want to log both
// errors to help diagnosis.
try {
terminateWorkflow(workflowId, "Error when restarting the workflow");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced that a try...catch block here is the right way to address this. The exception when terminating a workflow should be caught at a granular level within the terminate logic and this should be logged/handled accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception is primarily for executionDAOFacade.createWorkflow(workflow), and terminateWorkflow is just a clean up as part of that exception. I'd still like to throw the exception for createWorkflow, irrespective of what happened with terminateWorkflow, while logging terminateWorkflow exception. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the exception handling for createWorkflow needs to be handled.
My comment is regarding the extended handling for the terminate part which seems out of place here. Something like this -

try {
    executionDAOFacade.createWorkflow(workflow);
} catch (Exception e) {
    Monitors.recordWorkflowStartError(workflowDef.getName(), WorkflowContext.get().getClientApp());
    LOGGER.error("Unable to restart workflow: {}", workflowDef.getName(), e);
    terminateWorkflow(workflowId, "Error when restarting the workflow");
    throw e;
}

} catch (Exception rwe) {
LOGGER.error("Could not terminate the workflowId: " + workflowId, rwe);
}
throw e;
}

decide(workflowId);

if (StringUtils.isNotEmpty(workflow.getParentWorkflowId())) {
Expand Down Expand Up @@ -862,18 +878,11 @@ public void updateTask(TaskResult taskResult) {
task.setEndTime(System.currentTimeMillis());
}

// Fails the workflow if any of the below operations fail.
// This helps avoid workflow inconsistencies. For example, for the taskResult with status:COMPLETED,
// if update task to primary data store is successful, but remove from queue fails,
// The decide wouldn't run and next task will not be scheduled.
// TODO Try to recover the workflow.
// Try and ignore QueueDAO operations based on task status
try {
String updateTaskQueueDesc = "Updating Task queues for taskId: " + task.getTaskId();
String taskQueueOperation = "updateTaskQueues";
String updateTaskDesc = "Updating Task with taskId: " + task.getTaskId();
String updateTaskOperation = "updateTask";

// Retry each operation twice before failing workflow.
new RetryUtil<>().retryOnException(() -> {
switch (task.getStatus()) {
case COMPLETED:
Expand All @@ -896,6 +905,17 @@ public void updateTask(TaskResult taskResult) {
}
return null;
}, null, null, 2, updateTaskQueueDesc, taskQueueOperation);
} catch (Exception e) {
String errorMsg = String.format("Error updating the queue for task: %s for workflow: %s", task.getTaskId(), workflowId);
LOGGER.warn(errorMsg, e);
Monitors.recordTaskQueueOpError(task.getTaskType(), workflowInstance.getWorkflowName());
}


// Throw an ApplicationException if below operations fail to avoid workflow inconsistencies.
try {
String updateTaskDesc = "Updating Task with taskId: " + task.getTaskId();
String updateTaskOperation = "updateTask";

new RetryUtil<>().retryOnException(() -> {
executionDAOFacade.updateTask(task);
Expand Down Expand Up @@ -1044,15 +1064,6 @@ public boolean decide(String workflowId) {
}
}

if (!outcome.tasksToBeUpdated.isEmpty()) {
for (Task task : tasksToBeUpdated) {
if (task.getStatus() != null && (!task.getStatus().equals(Task.Status.IN_PROGRESS)
|| !task.getStatus().equals(Task.Status.SCHEDULED))) {
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
}
}
}

if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) {
executionDAOFacade.updateTasks(tasksToBeUpdated);
executionDAOFacade.updateWorkflow(workflow);
Expand Down Expand Up @@ -1539,6 +1550,7 @@ private boolean rerunWF(String workflowId, String taskId, Map<String, Object> ta
workflow.setInput(workflowInput);
}

queueDAO.push(DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), config.getSweepFrequency());
executionDAOFacade.updateWorkflow(workflow);

decide(workflowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ public static void recordTaskUpdateError(String taskType, String workflowType) {
counter(classQualifier, "task_update_error", "workflowName", workflowType, "taskType", taskType);
}

public static void recordTaskQueueOpError(String taskType, String workflowType) {
counter(classQualifier, "task_queue_op_error", "workflowName", workflowType, "taskType", taskType);
}

public static void recordWorkflowCompletion(String workflowType, long duration, String ownerApp) {
getTimer(classQualifier, "workflow_execution", "workflowName", workflowType, "ownerApp", ""+ownerApp).record(duration, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public List<PollData> getAllPollData() {
return taskService.getAllPollData();
}

@Deprecated
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@POST
@Path("/queue/requeue")
@ApiOperation("Requeue pending tasks for all the running workflows")
Expand Down
Loading