Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: OnExit node can retry when retry a workflow (#13184) #13185

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func (s *CLISuite) TestRetryWorkflowWithContinueOn() {
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
workflowName = metadata.Name
assert.Equal(t, 7, len(status.Nodes))
assert.Equal(t, 10, len(status.Nodes))
}).
RunCli([]string{"retry", workflowName}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err, output) {
Expand All @@ -954,7 +954,7 @@ func (s *CLISuite) TestRetryWorkflowWithContinueOn() {
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
workflowName = metadata.Name
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Equal(t, 7, len(status.Nodes))
assert.Equal(t, 10, len(status.Nodes))
}).
ExpectWorkflowNode(func(status wfv1.NodeStatus) bool {
return strings.Contains(status.Name, ".success")
Expand Down
21 changes: 16 additions & 5 deletions test/e2e/testdata/retry-workflow-with-continueon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ spec:
value: 0
- name: failure
template: node-to-exit
dependencies: [success]
dependencies: [ success ]
onExit: on-exit
arguments:
parameters:
- name: exitCode
value: 1
- name: task-after-failure
template: node-to-exit
dependencies: [failure]
dependencies: [ failure ]
arguments:
parameters:
- name: exitCode
Expand All @@ -36,14 +37,14 @@ spec:
template: node-to-exit
continueOn:
failed: true
dependencies: [success]
dependencies: [ success ]
arguments:
parameters:
- name: exitCode
value: 2
- name: task-after-continue
template: node-to-exit
dependencies: [continue]
dependencies: [ continue ]
arguments:
parameters:
- name: exitCode
Expand All @@ -55,4 +56,14 @@ spec:
- name: exitCode
container:
image: alpine:3.7
command: [ sh, "-c", "exit {{inputs.parameters.exitCode}}" ]
command: [ sh, "-c", "exit {{inputs.parameters.exitCode}}" ]

- name: on-exit
steps:
- - name: alarm
template: alarm

- name: alarm
container:
image: alpine:3.7
command: [ sh, "-c", "exit 0" ]
30 changes: 28 additions & 2 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,10 @@ func isDescendantNodeSucceeded(wf *wfv1.Workflow, node wfv1.NodeStatus, nodeIDsT
if err != nil {
log.Panicf("Coudn't obtain child for %s, panicking", child)
}
// skip onExit child
if childStatus.IsExitNode() {
continue
}
_, present := nodeIDsToReset[child]
if (!present && childStatus.Phase == wfv1.NodeSucceeded) || isDescendantNodeSucceeded(wf, *childStatus, nodeIDsToReset) {
return true
Expand Down Expand Up @@ -885,7 +889,6 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
}
}

onExitNodeName := wf.ObjectMeta.Name + ".onExit"
// Get all children of nodes that match filter
nodeIDsToReset, err := getNodeIDsToReset(restartSuccessful, nodeFieldSelector, wf.Status.Nodes)
if err != nil {
Expand All @@ -905,7 +908,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
}
switch node.Phase {
case wfv1.NodeSucceeded, wfv1.NodeSkipped:
if strings.HasPrefix(node.Name, onExitNodeName) || doForceResetNode {
if doForceResetNode {
log.Debugf("Force reset for node: %s", node.Name)
// Reset parent node if this node is a step/task group or DAG.
if isGroupNode(node) && node.BoundaryID != "" {
Expand Down Expand Up @@ -961,6 +964,29 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
newWF.Status.Nodes.Set(newNode.ID, resetNode(*newNode))
}
}
} else if node.IsExitNode() {
log.Debugf("Handle retry for onExit node: %s", node.Name)
// handle onExit node and its children
deletedNodes[node.ID] = true
if node.Type == wfv1.NodeTypePod {
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, node, deletedPods, podsToDelete)
log.Debugf("Deleted pod node: %s", node.Name)
}

descendantNodeIDs := getDescendantNodeIDs(wf, node)
for _, descendantNodeID := range descendantNodeIDs {
deletedNodes[descendantNodeID] = true
descendantNode, err := wf.Status.Nodes.Get(descendantNodeID)
if err != nil {
log.Fatalf("Was unable to obtain node for %s due to %s", descendantNodeID, err)
return nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", descendantNodeID, err)
}
if descendantNode.Type == wfv1.NodeTypePod {
newWF, resetParentGroupNodes = resetConnectedParentGroupNodes(wf, newWF, node, resetParentGroupNodes)
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, *descendantNode, deletedPods, podsToDelete)
log.Debugf("Deleted pod node %s since it belongs to node %s", descendantNode.Name, node.Name)
}
}
} else {
if !containsNode(resetParentGroupNodes, node.ID) {
log.Debugf("Node %s remains as is", node.Name)
Expand Down
Loading