diff --git a/pkg/apis/workflow/v1alpha1/workflow_phase.go b/pkg/apis/workflow/v1alpha1/workflow_phase.go index 4027b10dca6a..86f2416a001d 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_phase.go +++ b/pkg/apis/workflow/v1alpha1/workflow_phase.go @@ -10,6 +10,7 @@ const ( WorkflowSucceeded WorkflowPhase = "Succeeded" WorkflowFailed WorkflowPhase = "Failed" // it maybe that the workflow was terminated WorkflowError WorkflowPhase = "Error" + WorkflowCanceled WorkflowPhase = "Canceled" // it is an intermediate state when enable failFast. Workflow phase will be changed from Canceled to Succeeded/Failed/Error ) func (p WorkflowPhase) Completed() bool { @@ -20,3 +21,10 @@ func (p WorkflowPhase) Completed() bool { return false } } + +func (p WorkflowPhase) Canceled() bool { + if p == WorkflowCanceled { + return true + } + return false +} diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index e827a7f109ab..6b7a26fad41c 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -165,7 +165,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh branchPhase := curr.phase if !node.Fulfilled() { - return wfv1.NodeRunning, nil + branchPhase = wfv1.NodeRunning } // Only overwrite the branchPhase if this node completed. (If it didn't we can just inherit our parent's branchPhase). @@ -197,11 +197,9 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh for _, depName := range targetTasks { branchPhase := targetTaskPhases[d.taskNodeID(depName)] if branchPhase == "" { + // exist task not executed, just return and let workflow run result = wfv1.NodeRunning - // If failFast is disabled, we will want to let all tasks complete before checking for failures - if !failFast { - break - } + return result, nil } else if branchPhase.FailedOrError() { // If this target task has continueOn set for its current phase, then don't treat it as failed for the purposes // of determining DAG status. This is so that target tasks with said continueOn do not fail the overall DAG. @@ -215,7 +213,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh result = branchPhase // If failFast is enabled, don't check to see if other target tasks are complete and fail now instead if failFast { - break + d.wf.Status.Phase = wfv1.WorkflowCanceled } } } @@ -520,8 +518,12 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex return } if !execute { - // Given the results of this node's dependencies, this node should not be executed. Mark it omitted - woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeOmitted, "omitted: depends condition not met") + unExecMsg := dagCtx.getUnExecuteMessage() + // Following is the reasons that this node should not be executed: + // 1. not met depends base on the results of this node's dependencies + // 2. workflow fail fast + // Mark it omitted + woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeOmitted, unExecMsg) connectDependencies(nodeName) return } @@ -898,5 +900,18 @@ func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) { if err != nil { return false, false, fmt.Errorf("unable to evaluate expression '%s': %s", evalLogic, err) } + + // check workflow phase + // if canceled which means workflow must fail fast and unscheduled pod will be marked as "Skipped" + if execute && d.wf.Status.Phase.Canceled() { + execute = false + } return execute, true, nil } + +func (d *dagContext) getUnExecuteMessage() string { + if d.wf.Status.Phase.Canceled() { + return "omitted: workflow fail fast" + } + return "omitted: depends condition not met" +}