Skip to content

Commit

Permalink
fix: fix fastFail flag bug(argoproj#10312)
Browse files Browse the repository at this point in the history
Signed-off-by: Goober <chenhao86899@gmail.com>
  • Loading branch information
JasonChen86899 authored and chenhao191 committed Dec 23, 2023
1 parent 87c9be7 commit 5f79f96
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 11 deletions.
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
WorkflowSucceeded WorkflowPhase = "Succeeded"
WorkflowFailed WorkflowPhase = "Failed" // it maybe that the workflow was terminated
WorkflowError WorkflowPhase = "Error"
WorkflowCanceled WorkflowPhase = "Canceled" // it is an intermediate state when enable failFast. Workflow phase will be changed from Canceled to Succeeded/Failed/Error
)

func (p WorkflowPhase) Completed() bool {
Expand All @@ -20,3 +21,7 @@ func (p WorkflowPhase) Completed() bool {
return false
}
}

func (p WorkflowPhase) Canceled() bool {
return p == WorkflowCanceled
}
57 changes: 48 additions & 9 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
targetTaskPhases[d.taskNodeID(task)] = ""
}

// Record node hooks whether Completed
// Hooks seem like target tasks, we must wait them Completed before the whole workflow end
hooksCompleted := make(map[string]bool)

boundaryNode, err := nodes.Get(d.boundaryID)
if err != nil {
return "", err
Expand All @@ -165,7 +169,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
branchPhase := curr.phase

if !node.Fulfilled() {
return wfv1.NodeRunning, nil
branchPhase = wfv1.NodeRunning
}

// Only overwrite the branchPhase if this node completed. (If it didn't we can just inherit our parent's branchPhase).
Expand All @@ -184,6 +188,16 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
}
}

// check node if it is hookNode or exitNode then record it for next judgment
if node.NodeFlag != nil && node.NodeFlag.Hooked {
if node.Completed() {
hooksCompleted[node.ID] = true
}
if node.Phase == wfv1.NodePending || node.Phase == wfv1.NodeRunning {
hooksCompleted[node.ID] = false
}
}

if node.Type == wfv1.NodeTypeRetry {
uniqueQueue.add(generatePhaseNodes(getRetryNodeChildrenIds(node, nodes), branchPhase)...)
} else {
Expand All @@ -196,12 +210,10 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
result := wfv1.NodeSucceeded
for _, depName := range targetTasks {
branchPhase := targetTaskPhases[d.taskNodeID(depName)]
if branchPhase == "" {
if branchPhase == "" || !branchPhase.Fulfilled() {
// exist task not executed, just return and let workflow run
result = wfv1.NodeRunning
// If failFast is disabled, we will want to let all tasks complete before checking for failures
if !failFast {
break
}
return result, nil
} else if branchPhase.FailedOrError() {
// If this target task has continueOn set for its current phase, then don't treat it as failed for the purposes
// of determining DAG status. This is so that target tasks with said continueOn do not fail the overall DAG.
Expand All @@ -215,7 +227,17 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
result = branchPhase
// If failFast is enabled, don't check to see if other target tasks are complete and fail now instead
if failFast {
break
d.wf.Status.Phase = wfv1.WorkflowCanceled
}
}
}

// if not running(Succeeded or Failed/Error), then check all hooks Completed, if not then let workflow run
if result != wfv1.NodeRunning {
for _, hookCompleted := range hooksCompleted {
if !hookCompleted {
result = wfv1.NodeRunning
return result, nil
}
}
}
Expand Down Expand Up @@ -520,8 +542,12 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
return
}
if !execute {
// Given the results of this node's dependencies, this node should not be executed. Mark it omitted
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeOmitted, &wfv1.NodeFlag{}, "omitted: depends condition not met")
unExecMsg := dagCtx.getUnExecuteMessage()
// Following is the reasons that this node should not be executed:
// 1. not met depends base on the results of this node's dependencies
// 2. workflow fail fast
// Mark it omitted
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeOmitted, &wfv1.NodeFlag{}, unExecMsg)
connectDependencies(nodeName)
return
}
Expand Down Expand Up @@ -888,5 +914,18 @@ func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) {
if err != nil {
return false, false, fmt.Errorf("unable to evaluate expression '%s': %s", evalLogic, err)
}

// check workflow phase
// if canceled which means workflow must fail fast and unscheduled pod will be marked as "Skipped"
if execute && d.wf.Status.Phase.Canceled() {
execute = false
}
return execute, true, nil
}

func (d *dagContext) getUnExecuteMessage() string {
if d.wf.Status.Phase.Canceled() {
return "omitted: workflow fail fast"
}
return "omitted: depends condition not met"
}
2 changes: 1 addition & 1 deletion workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1692,7 +1692,7 @@ func TestRetryStrategyNodes(t *testing.T) {
assert.Equal(t, wfv1.NodePending, onExitNode.Phase)
}

assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
assert.Equal(t, wfv1.WorkflowCanceled, woc.wf.Status.Phase)
}

var testOnExitNodeDAGPhase = `
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/exit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ status:
assert.Len(t, taskSets.Items, 0)
}
woc.operate(ctx)
assert.Equal(t, woc.wf.Status.Phase, wfv1.WorkflowRunning)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
}

func TestStepsTemplateOnExitStatusArgument(t *testing.T) {
Expand Down

0 comments on commit 5f79f96

Please sign in to comment.