Skip to content

Commit

Permalink
add targetTaskHooks check
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonChen86899 committed Oct 16, 2023
1 parent 0978c4d commit 594dc66
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
5 changes: 1 addition & 4 deletions pkg/apis/workflow/v1alpha1/workflow_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,5 @@ func (p WorkflowPhase) Completed() bool {
}

func (p WorkflowPhase) Canceled() bool {
if p == WorkflowCanceled {
return true
}
return false
return p == WorkflowCanceled
}
22 changes: 21 additions & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
for _, task := range targetTasks {
targetTaskPhases[d.taskNodeID(task)] = ""
}
// record target task hooks whether Completed
targetTaskHooksCompleted := make(map[string]bool)

boundaryNode, err := nodes.Get(d.boundaryID)
if err != nil {
Expand All @@ -168,9 +170,12 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
branchPhase = wfv1.NodeRunning
}

// Only overwrite the branchPhase if this node completed. (If it didn't we can just inherit our parent's branchPhase).
if node.Completed() {
branchPhase = node.Phase
if _, ok := targetTaskHooksCompleted[node.ID]; ok {
// if this node is hook, then record it Completed
targetTaskHooksCompleted[node.ID] = true
}
}

// This node is a target task, so it will not have any children. Store or deduce its phase
Expand All @@ -182,6 +187,11 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
if !previousPhase.FailedOrError() {
targetTaskPhases[node.ID] = branchPhase
}

// add hooks
for _, hook := range node.Children {
targetTaskHooksCompleted[hook] = false
}
}

if node.Type == wfv1.NodeTypeRetry {
Expand Down Expand Up @@ -218,6 +228,16 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
}
}

// if fail or error, then check all target task hooks Completed, if not then let workflow run
if result.FailedOrError() {
for _, hookCompleted := range targetTaskHooksCompleted {
if !hookCompleted {
result = wfv1.NodeRunning
return result, nil
}
}
}

return result, nil
}

Expand Down

0 comments on commit 594dc66

Please sign in to comment.