Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
fix subworkflow output during repair (#3232)
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx authored Sep 10, 2022
1 parent a2a64e6 commit b843af4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private void updateTaskStatus(WorkflowModel subworkflow, TaskModel task) {
task.setExternalOutputPayloadStoragePath(
subworkflow.getExternalOutputPayloadStoragePath());
} else {
task.getOutputData().putAll(subworkflow.getOutput());
task.addOutput(subworkflow.getOutput());
}
if (!status.isSuccessful()) {
task.setReasonForIncompletion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,12 @@ boolean verifyAndRepairTask(TaskModel task) {
&& task.getStatus() == TaskModel.Status.IN_PROGRESS) {
WorkflowModel subWorkflow = executionDAO.getWorkflow(task.getSubWorkflowId(), false);
if (subWorkflow.getStatus().isTerminal()) {
repairSubWorkflowTask(task, subWorkflow.getStatus());
LOGGER.info(
"Repairing sub workflow task {} for sub workflow {} in workflow {}",
task.getTaskId(),
task.getSubWorkflowId(),
task.getWorkflowInstanceId());
repairSubWorkflowTask(task, subWorkflow);
return true;
}
}
Expand All @@ -165,8 +170,8 @@ private boolean verifyAndRepairWorkflow(String workflowId) {
return false;
}

private void repairSubWorkflowTask(TaskModel task, WorkflowModel.Status subWorkflowStatus) {
switch (subWorkflowStatus) {
private void repairSubWorkflowTask(TaskModel task, WorkflowModel subWorkflow) {
switch (subWorkflow.getStatus()) {
case COMPLETED:
task.setStatus(TaskModel.Status.COMPLETED);
break;
Expand All @@ -180,6 +185,7 @@ private void repairSubWorkflowTask(TaskModel task, WorkflowModel.Status subWorkf
task.setStatus(TaskModel.Status.TIMED_OUT);
break;
}
task.addOutput(subWorkflow.getOutput());
executionDAO.updateTask(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.core.reconciliation;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.junit.Before;
Expand Down Expand Up @@ -239,10 +240,14 @@ public void assertInProgressSubWorkflowSystemTasksAreCheckedAndRepaired() {
task.setTaskId(taskId);
task.setCallbackAfterSeconds(60);
task.setSubWorkflowId(subWorkflowId);
Map<String, Object> outputMap = new HashMap<>();
outputMap.put("subWorkflowId", subWorkflowId);
task.setOutputData(outputMap);

WorkflowModel subWorkflow = new WorkflowModel();
subWorkflow.setWorkflowId(subWorkflowId);
subWorkflow.setStatus(WorkflowModel.Status.TERMINATED);
subWorkflow.setOutput(Map.of("k1", "v1", "k2", "v2"));

when(executionDAO.getWorkflow(subWorkflowId, false)).thenReturn(subWorkflow);

Expand All @@ -256,5 +261,9 @@ public void assertInProgressSubWorkflowSystemTasksAreCheckedAndRepaired() {
assertEquals(taskId, argumentCaptor.getValue().getTaskId());
assertEquals(subWorkflowId, argumentCaptor.getValue().getSubWorkflowId());
assertEquals(TaskModel.Status.CANCELED, argumentCaptor.getValue().getStatus());
assertNotNull(argumentCaptor.getValue().getOutputData());
assertEquals(subWorkflowId, argumentCaptor.getValue().getOutputData().get("subWorkflowId"));
assertEquals("v1", argumentCaptor.getValue().getOutputData().get("k1"));
assertEquals("v2", argumentCaptor.getValue().getOutputData().get("k2"));
}
}

0 comments on commit b843af4

Please sign in to comment.