diff --git a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java index dd7975425f..331a2b4ac9 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java @@ -110,6 +110,7 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { } boolean hasTaskExecutionCompleted = false; + boolean shouldRemoveTaskFromQueue = false; String workflowId = task.getWorkflowInstanceId(); // if we are here the Task object is updated and needs to be persisted regardless of an // exception @@ -130,7 +131,7 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { String.format( "Workflow is in %s state", workflow.getStatus().toString())); } - queueDAO.remove(queueName, task.getTaskId()); + shouldRemoveTaskFromQueue = true; return; } @@ -156,13 +157,12 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { // Update message in Task queue based on Task status // Remove asyncComplete system tasks from the queue that are not in SCHEDULED state if (isTaskAsyncComplete && task.getStatus() != TaskModel.Status.SCHEDULED) { - queueDAO.remove(queueName, task.getTaskId()); + shouldRemoveTaskFromQueue = true; hasTaskExecutionCompleted = true; } else if (task.getStatus().isTerminal()) { task.setEndTime(System.currentTimeMillis()); - queueDAO.remove(queueName, task.getTaskId()); + shouldRemoveTaskFromQueue = true; hasTaskExecutionCompleted = true; - LOGGER.debug("{} removed from queue: {}", task, queueName); } else { task.setCallbackAfterSeconds(systemTaskCallbackTime); systemTask @@ -188,6 +188,10 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e); } finally { executionDAOFacade.updateTask(task); + if (shouldRemoveTaskFromQueue) { + queueDAO.remove(queueName, task.getTaskId()); + LOGGER.debug("{} removed from queue: {}", task, queueName); + } // if the current task execution has completed, then the workflow needs to be evaluated if (hasTaskExecutionCompleted) { workflowExecutor.decide(workflowId);