Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix fastFail flag bug. Fixes #10312 #11992

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 154 additions & 12 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
)

const (
omittedMsgDependsNotMet = "omitted: depends condition not met"
omittedMsgWorkflowFastFailed = "omitted: workflow fast failed"
)

// dagContext holds context information about this context's DAG
type dagContext struct {
// boundaryName is the node name of the boundary node to this DAG.
Expand Down Expand Up @@ -56,6 +61,10 @@ type dagContext struct {
// Because this resolved "depends" is computed using regex and regex is expensive, we cache the results so that they
// are only computed once per operation
dependsLogic map[string]string

// hasFastFailed record workflow has fast failed
// if true then no need to do the function checkDAGFastFailed again
hasFastFailed bool
}

func (d *dagContext) GetTaskDependencies(taskName string) []string {
Expand Down Expand Up @@ -146,6 +155,10 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
targetTaskPhases[d.taskNodeID(task)] = ""
}

// Record node hooks whether Completed
// Hooks seem like target tasks, we must wait them Completed before the whole workflow end
hooksCompleted := make(map[string]bool)

boundaryNode, err := nodes.Get(d.boundaryID)
if err != nil {
return "", err
Expand All @@ -165,7 +178,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
branchPhase := curr.phase

if !node.Fulfilled() {
return wfv1.NodeRunning, nil
branchPhase = wfv1.NodeRunning
}

// Only overwrite the branchPhase if this node completed. (If it didn't we can just inherit our parent's branchPhase).
Expand All @@ -184,6 +197,16 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
}
}

// check node if it is hookNode or exitNode then record it for next judgment
if node.NodeFlag != nil && node.NodeFlag.Hooked {
if node.Completed() {
hooksCompleted[node.ID] = true
}
if node.Phase == wfv1.NodePending || node.Phase == wfv1.NodeRunning {
hooksCompleted[node.ID] = false
}
}

if node.Type == wfv1.NodeTypeRetry {
uniqueQueue.add(generatePhaseNodes(getRetryNodeChildrenIds(node, nodes), branchPhase)...)
} else {
Expand All @@ -192,16 +215,13 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
}

// We only succeed if all the target tasks have been considered (i.e. its nodes created) and there are no failures
failFast := d.tmpl.DAG.FailFast == nil || *d.tmpl.DAG.FailFast
result := wfv1.NodeSucceeded
for _, depName := range targetTasks {
branchPhase := targetTaskPhases[d.taskNodeID(depName)]
if branchPhase == "" {
if branchPhase == "" || !branchPhase.Fulfilled() {
// exist task not executed, just return and let workflow run
result = wfv1.NodeRunning
// If failFast is disabled, we will want to let all tasks complete before checking for failures
if !failFast {
break
}
return result, nil
} else if branchPhase.FailedOrError() {
// If this target task has continueOn set for its current phase, then don't treat it as failed for the purposes
// of determining DAG status. This is so that target tasks with said continueOn do not fail the overall DAG.
Expand All @@ -213,9 +233,15 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
}

result = branchPhase
// If failFast is enabled, don't check to see if other target tasks are complete and fail now instead
if failFast {
break
}
}

// if not running(Succeeded or Failed/Error), then check all hooks Completed, if not then let workflow run
if result != wfv1.NodeRunning {
for _, hookCompleted := range hooksCompleted {
if !hookCompleted {
result = wfv1.NodeRunning
return result, nil
}
}
}
Expand Down Expand Up @@ -516,6 +542,30 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
for _, dep := range taskDependencies {
woc.executeDAGTask(ctx, dagCtx, dep)
}

// check DAG whether it has fast failed
fastFailed := dagCtx.hasFastFailed
if !fastFailed {
var err error
fastFailed, err = dagCtx.checkDAGFastFailed()
if err != nil {
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, &wfv1.NodeFlag{}, err.Error())
connectDependencies(nodeName)
return
}
log.WithField("fastFailed", fastFailed).Infof("CheckDAGFastFailed done")
dagCtx.hasFastFailed = fastFailed
}
// 1. DAG has fast failed
// 2. node does not exist
// then init node with Skipped type
if fastFailed && dagCtx.getTaskNode(taskName) == nil {
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeOmitted, &wfv1.NodeFlag{}, omittedMsgWorkflowFastFailed)
connectDependencies(nodeName)
return
}

// evaluate depends logic
execute, proceed, err := dagCtx.evaluateDependsLogic(taskName)
if err != nil {
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, &wfv1.NodeFlag{}, err.Error())
Expand All @@ -527,8 +577,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
return
}
if !execute {
// Given the results of this node's dependencies, this node should not be executed. Mark it omitted
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeOmitted, &wfv1.NodeFlag{}, "omitted: depends condition not met")
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeOmitted, &wfv1.NodeFlag{}, omittedMsgDependsNotMet)
connectDependencies(nodeName)
return
}
Expand Down Expand Up @@ -897,3 +946,96 @@ func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) {
}
return execute, true, nil
}

// check DAG whether fast failed, logic like assessDAGPhase
func (d *dagContext) checkDAGFastFailed() (bool, error) {
failFast := d.tmpl.DAG.FailFast == nil || *d.tmpl.DAG.FailFast
if !failFast {
return false, nil
}

// Identify our target tasks. If user did not specify any, then we choose all tasks which have
// no dependants.
var targetTasks []string
if d.tmpl.DAG.Target == "" {
targetTasks = d.findLeafTaskNames(d.tmpl.DAG.Tasks)
} else {
targetTasks = strings.Split(d.tmpl.DAG.Target, " ")
}

// targetTaskPhases keeps track of all the phases of the target tasks. This is necessary because some target tasks may
// be omitted and will not have an explicit phase. We would still like to deduce a phase for those tasks in order to
// determine the overall phase of the DAG. To do so, an omitted task always inherits the phase of its parents, with
// preference of Failed or Error phases over Succeeded. This means that if a task in a branch fails, all of its descendents
// will be considered Failed unless they themselves complete with a different phase, in which case that different phase
// will take precedence as the branch phase for their descendents.
targetTaskPhases := make(map[string]wfv1.NodePhase)
for _, task := range targetTasks {
targetTaskPhases[d.taskNodeID(task)] = ""
}

nodes := d.wf.Status.Nodes
boundaryNode, err := nodes.Get(d.boundaryID)
if err != nil {
return false, err
}
// BFS over the children of the DAG
uniqueQueue := newUniquePhaseNodeQueue(generatePhaseNodes(boundaryNode.Children, wfv1.NodeSucceeded)...)
for !uniqueQueue.empty() {
curr := uniqueQueue.pop()

node, err := nodes.Get(curr.nodeId)
if err != nil {
// node not exist
// it is ok just continue
continue
}
// We need to store the current branchPhase to remember the last completed phase in this branch so that we can apply it to omitted nodes
branchPhase := curr.phase

if !node.Fulfilled() {
branchPhase = wfv1.NodeRunning
}

// Only overwrite the branchPhase if this node completed. (If it didn't we can just inherit our parent's branchPhase).
if node.Completed() {
branchPhase = node.Phase
}

// This node is a target task, so it will not have any children. Store or deduce its phase
if previousPhase, isTargetTask := targetTaskPhases[node.ID]; isTargetTask {
// Since we want Failed or Errored phases to have preference over Succeeded in case of ambiguity, only update
// the deduced phase of the target task if it is not already Failed or Errored.
// Note that if the target task is NOT omitted (i.e. it Completed), then this check is moot, because every time
// we arrive at said target task it will have the same branchPhase.
if !previousPhase.FailedOrError() {
targetTaskPhases[node.ID] = branchPhase
}
}

if node.Type == wfv1.NodeTypeRetry {
uniqueQueue.add(generatePhaseNodes(getRetryNodeChildrenIds(node, nodes), branchPhase)...)
} else {
uniqueQueue.add(generatePhaseNodes(node.Children, branchPhase)...)
}
}

// when one targetTask failed or error, just return true
for _, depName := range targetTasks {
branchPhase := targetTaskPhases[d.taskNodeID(depName)]
if branchPhase.FailedOrError() {
// If this target task has continueOn set for its current phase, then don't treat it as failed for the purposes
// of determining DAG status. This is so that target tasks with said continueOn do not fail the overall DAG.
// For non-leaf tasks, this is done by setting all of its dependents to allow for their failure or error in
// their "depends" clause during their respective "dependencies" to "depends" conversion. See "expandDependency"
// in ancestry.go
if task := d.GetTask(depName); task.ContinuesOn(branchPhase) {
continue
}

return true, nil
}
}

return false, nil
}
2 changes: 1 addition & 1 deletion workflow/controller/exit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ status:
assert.Len(t, taskSets.Items, 0)
}
woc.operate(ctx)
assert.Equal(t, woc.wf.Status.Phase, wfv1.WorkflowRunning)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
}

func TestStepsTemplateOnExitStatusArgument(t *testing.T) {
Expand Down
Loading