diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index e69f3fec3b97..24e722630eb2 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -22,6 +22,8 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + log "github.com/sirupsen/logrus" + argoerrs "github.com/argoproj/argo-workflows/v3/errors" "github.com/argoproj/argo-workflows/v3/util/slice" ) @@ -1729,6 +1731,64 @@ func (n Nodes) Find(f func(NodeStatus) bool) *NodeStatus { return nil } +// Get a NodeStatus from the hashmap of Nodes. +// Return a nil along with an error if non existent. +func (n Nodes) Get(key string) (*NodeStatus, error) { + val, ok := n[key] + if !ok { + return nil, fmt.Errorf("key was not found for %s", key) + } + return &val, nil +} + +// Check if the Nodes map has a key entry +func (n Nodes) Has(key string) bool { + _, err := n.Get(key) + return err == nil +} + +// Get the Phase of a Node +func (n Nodes) GetPhase(key string) (*NodePhase, error) { + val, err := n.Get(key) + if err != nil { + return nil, err + } + return &val.Phase, nil +} + +// Set the status of a node by key +func (n Nodes) Set(key string, status NodeStatus) { + if status.Name == "" { + log.Warnf("Name was not set for key %s", key) + } + if status.ID == "" { + log.Warnf("ID was not set for key %s", key) + } + _, ok := n[key] + if ok { + log.Tracef("Changing NodeStatus for %s to %+v", key, status) + } + n[key] = status +} + +// Delete a node from the Nodes by key +func (n Nodes) Delete(key string) { + has := n.Has(key) + if !has { + log.Warnf("Trying to delete non existent key %s", key) + return + } + delete(n, key) +} + +// Get the name of a node by key +func (n Nodes) GetName(key string) (string, error) { + val, err := n.Get(key) + if err != nil { + return "", err + } + return val.Name, nil +} func NodeWithName(name string) func(n NodeStatus) bool { return func(n NodeStatus) bool { return n.Name == name } } @@ -3251,13 +3311,9 @@ func (wf *Workflow) GetTemplateByName(name string) *Template { return nil } -func (wf *Workflow) GetNodeByName(nodeName string) *NodeStatus { +func (wf *Workflow) GetNodeByName(nodeName string) (*NodeStatus, error) { nodeID := wf.NodeID(nodeName) - node, ok := wf.Status.Nodes[nodeID] - if !ok { - return nil - } - return &node + return wf.Status.Nodes.Get(nodeID) } // GetResourceScope returns the template scope of workflow. diff --git a/server/artifacts/artifact_server.go b/server/artifacts/artifact_server.go index b2804a331e9a..6c46fbf8c280 100644 --- a/server/artifacts/artifact_server.go +++ b/server/artifacts/artifact_server.go @@ -378,10 +378,16 @@ func (a *ArtifactServer) getArtifactAndDriver(ctx context.Context, nodeId, artif kubeClient := auth.GetKubeClient(ctx) var art *wfv1.Artifact + + nodeStatus, err := wf.Status.Nodes.Get(nodeId) + if err != nil { + log.Errorf("Was unable to retrieve node for %s", nodeId) + return nil, nil, fmt.Errorf("was not able to retrieve node") + } if isInput { - art = wf.Status.Nodes[nodeId].Inputs.GetArtifactByName(artifactName) + art = nodeStatus.Inputs.GetArtifactByName(artifactName) } else { - art = wf.Status.Nodes[nodeId].Outputs.GetArtifactByName(artifactName) + art = nodeStatus.Outputs.GetArtifactByName(artifactName) } if art == nil { return nil, nil, fmt.Errorf("artifact not found: %s, isInput=%t, Workflow Status=%+v", artifactName, isInput, wf.Status) @@ -395,7 +401,12 @@ func (a *ArtifactServer) getArtifactAndDriver(ctx context.Context, nodeId, artif // 5. Inline Template var archiveLocation *wfv1.ArtifactLocation - templateName := util.GetTemplateFromNode(wf.Status.Nodes[nodeId]) + templateNode, err := wf.Status.Nodes.Get(nodeId) + if err != nil { + log.Errorf("was unable to retrieve node for %s", nodeId) + return nil, nil, fmt.Errorf("Unable to get artifact and driver due to inability to get node due for %s, err=%s", nodeId, err) + } + templateName := util.GetTemplateFromNode(*templateNode) if templateName != "" { template := wf.GetTemplateByName(templateName) if template == nil { @@ -412,7 +423,7 @@ func (a *ArtifactServer) getArtifactAndDriver(ctx context.Context, nodeId, artif archiveLocation = ar.ToArtifactLocation() } - err := art.Relocate(archiveLocation) // if the Artifact defines the location (case 1), it will be used; otherwise whatever archiveLocation is set to + err = art.Relocate(archiveLocation) // if the Artifact defines the location (case 1), it will be used; otherwise whatever archiveLocation is set to if err != nil { return art, nil, err } diff --git a/test/e2e/fixtures/then.go b/test/e2e/fixtures/then.go index f26ffe7f3300..191f83d2207b 100644 --- a/test/e2e/fixtures/then.go +++ b/test/e2e/fixtures/then.go @@ -224,7 +224,10 @@ func (t *Then) ExpectArtifact(nodeName string, artifactName string, bucketName s nodeName = t.wf.Name } - n := t.wf.GetNodeByName(nodeName) + n, err := t.wf.GetNodeByName(nodeName) + if err != nil { + t.t.Error("was unable to get node by name") + } a := n.GetOutputs().GetArtifactByName(artifactName) key, _ := a.GetKey() diff --git a/util/resource/updater.go b/util/resource/updater.go index 19b74a1d0fcc..1db749e9dc57 100644 --- a/util/resource/updater.go +++ b/util/resource/updater.go @@ -1,6 +1,8 @@ package resource import ( + log "github.com/sirupsen/logrus" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -14,7 +16,7 @@ func UpdateResourceDurations(wf *wfv1.Workflow) { } else if node.Fulfilled() { // compute the sum of all children node.ResourcesDuration = resourceDuration(wf, node, make(map[string]bool)) - wf.Status.Nodes[nodeID] = node + wf.Status.Nodes.Set(nodeID, node) } } } @@ -27,11 +29,15 @@ func resourceDuration(wf *wfv1.Workflow, node wfv1.NodeStatus, visited map[strin continue } visited[childID] = true - child := wf.Status.Nodes[childID] + child, err := wf.Status.Nodes.Get(childID) + if err != nil { + log.Warnf("was unable to obtain node for %s", childID) + continue + } if child.Type == wfv1.NodeTypePod { v = v.Add(child.ResourcesDuration) } - v = v.Add(resourceDuration(wf, child, visited)) + v = v.Add(resourceDuration(wf, *child, visited)) } return v } diff --git a/workflow/controller/artifact_gc.go b/workflow/controller/artifact_gc.go index f52b49097912..2540bbb7aa4e 100644 --- a/workflow/controller/artifact_gc.go +++ b/workflow/controller/artifact_gc.go @@ -168,8 +168,9 @@ func (woc *wfOperationCtx) processArtifactGCStrategy(ctx context.Context, strate groupedByPod[podName] = make(templatesToArtifacts) } // get the Template for the Artifact - node, found := woc.wf.Status.Nodes[artifactSearchResult.NodeID] - if !found { + node, err := woc.wf.Status.Nodes.Get(artifactSearchResult.NodeID) + if err != nil { + woc.log.Errorf("Was unable to obtain node for %s", artifactSearchResult.NodeID) return fmt.Errorf("can't process Artifact GC Strategy %s: node ID %q not found in Status??", strategy, artifactSearchResult.NodeID) } templateName := node.TemplateName @@ -635,8 +636,9 @@ func (woc *wfOperationCtx) processCompletedWorkflowArtifactGCTask(artifactGCTask foundGCFailure := false for nodeName, nodeResult := range artifactGCTask.Status.ArtifactResultsByNode { // find this node result in the Workflow Status - wfNode, found := woc.wf.Status.Nodes[nodeName] - if !found { + wfNode, err := woc.wf.Status.Nodes.Get(nodeName) + if err != nil { + woc.log.Errorf("Was unable to obtain node for %s", nodeName) return false, fmt.Errorf("node named %q returned by WorkflowArtifactGCTask %q wasn't found in Workflow %q Status", nodeName, artifactGCTask.Name, woc.wf.Name) } if wfNode.Outputs == nil { @@ -649,7 +651,9 @@ func (woc *wfOperationCtx) processCompletedWorkflowArtifactGCTask(artifactGCTask // could be in a different WorkflowArtifactGCTask continue } - woc.wf.Status.Nodes[nodeName].Outputs.Artifacts[i].Deleted = artifactResult.Success + + wfNode.Outputs.Artifacts[i].Deleted = artifactResult.Success + woc.wf.Status.Nodes.Set(nodeName, *wfNode) if artifactResult.Error != nil { woc.addArtGCCondition(fmt.Sprintf("%s (artifactGCTask: %s)", *artifactResult.Error, artifactGCTask.Name)) diff --git a/workflow/controller/container_set_template.go b/workflow/controller/container_set_template.go index 05cf079cda8b..c5a7865ed7df 100644 --- a/workflow/controller/container_set_template.go +++ b/workflow/controller/container_set_template.go @@ -8,8 +8,8 @@ import ( ) func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending) } includeScriptOutput, err := woc.includeScriptOutput(nodeName, opts.boundaryID) @@ -30,8 +30,8 @@ func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName str // which prevents creating many pending nodes that could never be scheduled for _, c := range tmpl.ContainerSet.GetContainers() { ctxNodeName := fmt.Sprintf("%s.%s", nodeName, c.Name) - ctrNode := woc.wf.GetNodeByName(ctxNodeName) - if ctrNode == nil { + _, err := woc.wf.GetNodeByName(ctxNodeName) + if err != nil { _ = woc.initializeNode(ctxNodeName, wfv1.NodeTypeContainer, templateScope, orgTmpl, node.ID, wfv1.NodePending) } } diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 431149059682..dbbf0f343b14 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -119,11 +119,12 @@ func (d *dagContext) taskNodeID(taskName string) string { // getTaskNode returns the node status of a task. func (d *dagContext) getTaskNode(taskName string) *wfv1.NodeStatus { nodeID := d.taskNodeID(taskName) - node, ok := d.wf.Status.Nodes[nodeID] - if !ok { + node, err := d.wf.Status.Nodes.Get(nodeID) + if err != nil { + log.Warnf("was unable to obtain the node for %s, taskName %s", nodeID, taskName) return nil } - return &node + return node } // assessDAGPhase assesses the overall DAG status @@ -206,13 +207,19 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes) wfv1 } func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmplCtx *templateresolution.Context, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypeDAG, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodeRunning) } defer func() { - if woc.wf.Status.Nodes[node.ID].Fulfilled() { + node, err := woc.wf.Status.Nodes.Get(node.ID) + if err != nil { + // CRITICAL ERROR IF THIS BRANCH IS REACHED -> PANIC + panic(fmt.Sprintf("expected node for %s due to preceded initializeExecutableNode but couldn't find it", node.ID)) + } + if node.Fulfilled() { woc.killDaemonedChildren(node.ID) } }() @@ -280,7 +287,10 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl case wfv1.NodeRunning: return node, nil case wfv1.NodeError, wfv1.NodeFailed: - woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName) + err = woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName) + if err != nil { + return nil, err + } _ = woc.markNodePhase(nodeName, dagPhase) return node, nil } @@ -298,10 +308,16 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl if taskNode.Type == wfv1.NodeTypeTaskGroup { childNodes := make([]wfv1.NodeStatus, len(taskNode.Children)) for i, childID := range taskNode.Children { - childNodes[i] = woc.wf.Status.Nodes[childID] + childNode, err := woc.wf.Status.Nodes.Get(childID) + if err != nil { + woc.log.Errorf("was unable to obtain node for %s", childID) + return nil, fmt.Errorf("Critical error, unable to find %s", childID) + } + childNodes[i] = *childNode } err := woc.processAggregateNodeOutputs(scope, prefix, childNodes) if err != nil { + woc.log.Errorf("unable to processAggregateNodeOutputs") return nil, errors.InternalWrapError(err) } } @@ -310,26 +326,35 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl } outputs, err := getTemplateOutputsFromScope(tmpl, scope) if err != nil { + woc.log.Errorf("unable to get outputs") return node, err } if outputs != nil { - node.Outputs = outputs - woc.wf.Status.Nodes[node.ID] = *node - } - if node.MemoizationStatus != nil { - c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName) - err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, outputs) + node, err = woc.wf.GetNodeByName(nodeName) if err != nil { - woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache") - node.Phase = wfv1.NodeError + woc.log.Errorf("unable to get node by name for %s", nodeName) + return nil, err + } + node.Outputs = outputs + woc.wf.Status.Nodes.Set(node.ID, *node) + if node.MemoizationStatus != nil { + c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName) + err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs) + if err != nil { + woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache") + node.Phase = wfv1.NodeError + } } } - woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName) + err = woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName) + if err != nil { + return nil, err + } return woc.markNodePhase(nodeName, wfv1.NodeSucceeded), nil } -func (woc *wfOperationCtx) updateOutboundNodesForTargetTasks(dagCtx *dagContext, targetTasks []string, nodeName string) { +func (woc *wfOperationCtx) updateOutboundNodesForTargetTasks(dagCtx *dagContext, targetTasks []string, nodeName string) error { // set the outbound nodes from the target tasks outbound := make([]string, 0) for _, depName := range targetTasks { @@ -341,10 +366,15 @@ func (woc *wfOperationCtx) updateOutboundNodesForTargetTasks(dagCtx *dagContext, outboundNodeIDs := woc.getOutboundNodes(depNode.ID) outbound = append(outbound, outboundNodeIDs...) } - node := woc.wf.GetNodeByName(nodeName) + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { + woc.log.Warnf("was unable to obtain node by name for %s", nodeName) + return err + } node.OutboundNodes = outbound - woc.wf.Status.Nodes[node.ID] = *node + woc.wf.Status.Nodes.Set(node.ID, *node) woc.log.Infof("Outbound nodes of %s set to %s", node.ID, outbound) + return nil } // executeDAGTask traverses and executes the upward chain of dependencies of a task @@ -420,8 +450,9 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex // Check if our dependencies completed. If not, recurse our parents executing them if necessary nodeName := dagCtx.taskNodeName(taskName) taskDependencies := dagCtx.GetTaskDependencies(taskName) + // error condition taken care of via a nil check + taskGroupNode, _ := woc.wf.GetNodeByName(nodeName) - taskGroupNode := woc.wf.GetNodeByName(nodeName) if taskGroupNode != nil && taskGroupNode.Type != wfv1.NodeTypeTaskGroup { taskGroupNode = nil } @@ -441,7 +472,12 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex depNode := dagCtx.getTaskNode(depName) outboundNodeIDs := woc.getOutboundNodes(depNode.ID) for _, outNodeID := range outboundNodeIDs { - woc.addChildNode(woc.wf.Status.Nodes[outNodeID].Name, taskNodeName) + nodeName, err := woc.wf.Status.Nodes.GetName(outNodeID) + if err != nil { + woc.log.Errorf("was unable to obtain node for %s", outNodeID) + return + } + woc.addChildNode(nodeName, taskNodeName) } } } @@ -781,7 +817,10 @@ func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) { } // If a task happens to have an onExit node, don't proceed until the onExit node is fulfilled - if onExitNode := d.wf.GetNodeByName(common.GenerateOnExitNodeName(depNode.Name)); onExitNode != nil { + if onExitNode, err := d.wf.GetNodeByName(common.GenerateOnExitNodeName(depNode.Name)); onExitNode != nil { + if err != nil { + return false, false, err + } if !onExitNode.Fulfilled() { return false, false, nil } @@ -800,9 +839,14 @@ func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) { allFailed = len(depNode.Children) > 0 for _, childNodeID := range depNode.Children { - childNode := d.wf.Status.Nodes[childNodeID] - anySucceeded = anySucceeded || childNode.Phase == wfv1.NodeSucceeded - allFailed = allFailed && childNode.Phase == wfv1.NodeFailed + childNodePhase, err := d.wf.Status.Nodes.GetPhase(childNodeID) + if err != nil { + log.Warnf("was unable to obtain node for %s", childNodeID) + allFailed = false // we don't know if all failed + continue + } + anySucceeded = anySucceeded || *childNodePhase == wfv1.NodeSucceeded + allFailed = allFailed && *childNodePhase == wfv1.NodeFailed } } diff --git a/workflow/controller/dag_test.go b/workflow/controller/dag_test.go index 36690e26e844..fd8e0f6b2b58 100644 --- a/workflow/controller/dag_test.go +++ b/workflow/controller/dag_test.go @@ -1488,8 +1488,9 @@ func TestTerminateDAGWithMaxDurationLimitExpiredAndMoreAttempts(t *testing.T) { woc.operate(ctx) - retryNode := woc.wf.GetNodeByName("dag-diamond-dj7q5.A") - if assert.NotNil(t, retryNode) { + retryNode, err := woc.wf.GetNodeByName("dag-diamond-dj7q5.A") + if assert.NoError(t, err) { + assert.NotNil(t, retryNode) assert.Equal(t, wfv1.NodeFailed, retryNode.Phase) assert.Contains(t, retryNode.Message, "Max duration limit exceeded") } @@ -1676,13 +1677,15 @@ func TestRetryStrategyNodes(t *testing.T) { woc := newWorkflowOperationCtx(wf, controller) woc.operate(ctx) - retryNode := woc.wf.GetNodeByName("wf-retry-pol") - if assert.NotNil(t, retryNode) { + retryNode, err := woc.wf.GetNodeByName("wf-retry-pol") + if assert.NoError(t, err) { + assert.NotNil(t, retryNode) assert.Equal(t, wfv1.NodeFailed, retryNode.Phase) } - onExitNode := woc.wf.GetNodeByName("wf-retry-pol.onExit") - if assert.NotNil(t, onExitNode) { + onExitNode, err := woc.wf.GetNodeByName("wf-retry-pol.onExit") + if assert.NoError(t, err) { + assert.NotNil(t, onExitNode) assert.Equal(t, wfv1.NodePending, onExitNode.Phase) } @@ -1841,13 +1844,15 @@ func TestOnExitDAGPhase(t *testing.T) { woc := newWorkflowOperationCtx(wf, controller) woc.operate(ctx) - retryNode := woc.wf.GetNodeByName("dag-diamond-88trp") - if assert.NotNil(t, retryNode) { + retryNode, err := woc.wf.GetNodeByName("dag-diamond-88trp") + if assert.NoError(t, err) { + assert.NotNil(t, retryNode) assert.Equal(t, wfv1.NodeRunning, retryNode.Phase) } - retryNode = woc.wf.GetNodeByName("dag-diamond-88trp.B.onExit") - if assert.NotNil(t, retryNode) { + retryNode, err = woc.wf.GetNodeByName("dag-diamond-88trp.B.onExit") + if assert.NoError(t, err) { + assert.NotNil(t, retryNode) assert.Equal(t, wfv1.NodePending, retryNode.Phase) } @@ -1971,19 +1976,22 @@ func TestOnExitNonLeaf(t *testing.T) { woc := newWorkflowOperationCtx(wf, controller) woc.operate(ctx) - retryNode := woc.wf.GetNodeByName("exit-handler-bug-example.step-2.onExit") - if assert.NotNil(t, retryNode) { + retryNode, err := woc.wf.GetNodeByName("exit-handler-bug-example.step-2.onExit") + if assert.NoError(t, err) { + assert.NotNil(t, retryNode) assert.Equal(t, wfv1.NodePending, retryNode.Phase) } - assert.Nil(t, woc.wf.GetNodeByName("exit-handler-bug-example.step-3")) + _, err = woc.wf.GetNodeByName("exit-handler-bug-example.step-3") + assert.Error(t, err) retryNode.Phase = wfv1.NodeSucceeded woc.wf.Status.Nodes[retryNode.ID] = *retryNode woc = newWorkflowOperationCtx(woc.wf, controller) woc.operate(ctx) - retryNode = woc.wf.GetNodeByName("exit-handler-bug-example.step-3") - if assert.NotNil(t, retryNode) { + retryNode, err = woc.wf.GetNodeByName("exit-handler-bug-example.step-3") + if assert.NoError(t, err) { + assert.NotNil(t, retryNode) assert.Equal(t, wfv1.NodePending, retryNode.Phase) } assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase) @@ -2083,8 +2091,9 @@ func TestDagOptionalInputArtifacts(t *testing.T) { ctx := context.Background() woc.operate(ctx) assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase) - optionalInputArtifactsNode := woc.wf.GetNodeByName("dag-optional-inputartifacts.B") - if assert.NotNil(t, optionalInputArtifactsNode) { + optionalInputArtifactsNode, err := woc.wf.GetNodeByName("dag-optional-inputartifacts.B") + if assert.NoError(t, err) { + assert.NotNil(t, optionalInputArtifactsNode) assert.Equal(t, wfv1.NodePending, optionalInputArtifactsNode.Phase) } } @@ -2239,8 +2248,9 @@ func TestDagTargetTaskOnExit(t *testing.T) { woc := newWorkflowOperationCtx(wf, controller) woc.operate(ctx) - onExitNode := woc.wf.GetNodeByName("dag-primay-branch-6bnnl.A.onExit") - if assert.NotNil(t, onExitNode) { + onExitNode, err := woc.wf.GetNodeByName("dag-primay-branch-6bnnl.A.onExit") + if assert.NoError(t, err) { + assert.NotNil(t, onExitNode) assert.Equal(t, wfv1.NodePending, onExitNode.Phase) } } diff --git a/workflow/controller/estimation/estimator.go b/workflow/controller/estimation/estimator.go index ee183a8f525c..fdd8f91fb70c 100644 --- a/workflow/controller/estimation/estimator.go +++ b/workflow/controller/estimation/estimator.go @@ -3,6 +3,8 @@ package estimation import ( "strings" + log "github.com/sirupsen/logrus" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -29,5 +31,11 @@ func (e *estimator) EstimateNodeDuration(nodeName string) wfv1.EstimatedDuration return 0 } oldNodeID := e.baselineWF.NodeID(strings.Replace(nodeName, e.wf.Name, e.baselineWF.Name, 1)) - return wfv1.NewEstimatedDuration(e.baselineWF.Status.Nodes[oldNodeID].GetDuration()) + node, err := e.baselineWF.Status.Nodes.Get(oldNodeID) + if err != nil { + log.Errorf("was unable to obtain node for %s", oldNodeID) + // inacurate but not going to break anything + return 0 + } + return wfv1.NewEstimatedDuration(node.GetDuration()) } diff --git a/workflow/controller/exec_control.go b/workflow/controller/exec_control.go index ea86d9569d9b..d6ddf9fdce4c 100644 --- a/workflow/controller/exec_control.go +++ b/workflow/controller/exec_control.go @@ -21,9 +21,10 @@ func (woc *wfOperationCtx) applyExecutionControl(pod *apiv1.Pod, wfNodesLock *sy nodeID := woc.nodeID(pod) wfNodesLock.RLock() - node, ok := woc.wf.Status.Nodes[nodeID] + node, err := woc.wf.Status.Nodes.Get(nodeID) wfNodesLock.RUnlock() - if !ok { + if err != nil { + woc.log.Errorf("was unable to obtain node for %s", nodeID) return } // node is already completed @@ -79,7 +80,11 @@ func (woc *wfOperationCtx) handleExecutionControlError(nodeID string, wfNodesLoc wfNodesLock.Lock() defer wfNodesLock.Unlock() - node := woc.wf.Status.Nodes[nodeID] + node, err := woc.wf.Status.Nodes.Get(nodeID) + if err != nil { + woc.log.Errorf("was not abble to obtain node for %s", nodeID) + return + } woc.markNodePhase(node.Name, wfv1.NodeFailed, errorMsg) children, err := woc.wf.Status.Nodes.NestedChildrenStatus(nodeID) @@ -113,7 +118,7 @@ func (woc *wfOperationCtx) killDaemonedChildren(nodeID string) { woc.controller.queuePodForCleanup(woc.wf.Namespace, podName, terminateContainers) childNode.Phase = wfv1.NodeSucceeded childNode.Daemoned = nil - woc.wf.Status.Nodes[childNode.ID] = childNode + woc.wf.Status.Nodes.Set(childNode.ID, childNode) woc.updated = true } } diff --git a/workflow/controller/hooks.go b/workflow/controller/hooks.go index 25ad672423e2..ed122e3853eb 100644 --- a/workflow/controller/hooks.go +++ b/workflow/controller/hooks.go @@ -15,13 +15,13 @@ import ( func (woc *wfOperationCtx) executeWfLifeCycleHook(ctx context.Context, tmplCtx *templateresolution.Context) (bool, error) { var hookNodes []*wfv1.NodeStatus for hookName, hook := range woc.execWf.Spec.Hooks { - //exit hook will be executed in runOnExitNode + // exit hook will be executed in runOnExitNode if hookName == wfv1.ExitLifecycleEvent { continue } hookNodeName := generateLifeHookNodeName(woc.wf.ObjectMeta.Name, string(hookName)) // To check a node was triggered. - hookedNode := woc.wf.GetNodeByName(hookNodeName) + hookedNode, _ := woc.wf.GetNodeByName(hookNodeName) if hook.Expression == "" { return true, errors.Errorf(errors.CodeBadRequest, "Expression required for hook %s", hookNodeName) } @@ -56,13 +56,13 @@ func (woc *wfOperationCtx) executeWfLifeCycleHook(ctx context.Context, tmplCtx * func (woc *wfOperationCtx) executeTmplLifeCycleHook(ctx context.Context, scope *wfScope, lifeCycleHooks wfv1.LifecycleHooks, parentNode *wfv1.NodeStatus, boundaryID string, tmplCtx *templateresolution.Context, prefix string) (bool, error) { var hookNodes []*wfv1.NodeStatus for hookName, hook := range lifeCycleHooks { - //exit hook will be executed in runOnExitNode + // exit hook will be executed in runOnExitNode if hookName == wfv1.ExitLifecycleEvent { continue } hookNodeName := generateLifeHookNodeName(parentNode.Name, string(hookName)) // To check a node was triggered - hookedNode := woc.wf.GetNodeByName(hookNodeName) + hookedNode, _ := woc.wf.GetNodeByName(hookNodeName) if hook.Expression == "" { return false, errors.Errorf(errors.CodeBadRequest, "Expression required for hook %s", hookNodeName) } diff --git a/workflow/controller/http_template.go b/workflow/controller/http_template.go index 65ffbca85754..bf874b1a5b10 100644 --- a/workflow/controller/http_template.go +++ b/workflow/controller/http_template.go @@ -5,8 +5,8 @@ import ( ) func (woc *wfOperationCtx) executeHTTPTemplate(nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) *wfv1.NodeStatus { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypeHTTP, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending) woc.taskSet[node.ID] = *tmpl } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 6dc39eb410f6..7c242c8ec118 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -292,7 +292,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { } else { woc.workflowDeadline = woc.getWorkflowDeadline() woc.taskResultReconciliation() - err := woc.podReconciliation(ctx) + err = woc.podReconciliation(ctx) if err == nil { woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown() } @@ -861,8 +861,8 @@ func (woc *wfOperationCtx) reapplyUpdate(ctx context.Context, wfClient v1alpha1. return nil, err } for id, node := range woc.wf.Status.Nodes { - currNode, exists := currWf.Status.Nodes[id] - if exists && currNode.Fulfilled() && node.Phase != currNode.Phase { + currNode, err := currWf.Status.Nodes.Get(id) + if (err == nil) && currNode.Fulfilled() && node.Phase != currNode.Phase { return nil, fmt.Errorf("must never update completed node %s", id) } } @@ -939,7 +939,7 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate if !lastChildNode.FailedOrError() { node.Outputs = lastChildNode.Outputs.DeepCopy() - woc.wf.Status.Nodes[node.ID] = *node + woc.wf.Status.Nodes.Set(node.ID, *node) return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil } @@ -1084,8 +1084,9 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { wfNodesLock.Lock() defer wfNodesLock.Unlock() - if node, ok := woc.wf.Status.Nodes[nodeID]; ok { - if newState := woc.assessNodeStatus(pod, &node); newState != nil { + node, err := woc.wf.Status.Nodes.Get(nodeID) + if err == nil { + if newState := woc.assessNodeStatus(pod, node); newState != nil { woc.addOutputsToGlobalScope(newState.Outputs) if newState.MemoizationStatus != nil { if newState.Succeeded() { @@ -1101,7 +1102,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { if newState.Phase == wfv1.NodeRunning { podRunningCondition.Status = metav1.ConditionTrue } - woc.wf.Status.Nodes[nodeID] = *newState + woc.wf.Status.Nodes.Set(nodeID, *newState) woc.updated = true // warning! when the node completes, the daemoned flag will be unset, so we must check the old node if !node.IsDaemoned() && !node.Completed() && newState.Completed() { @@ -1303,7 +1304,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus // in this case need to update nodes according to container status for _, c := range pod.Status.ContainerStatuses { ctrNodeName := fmt.Sprintf("%s.%s", old.Name, c.Name) - if woc.wf.GetNodeByName(ctrNodeName) == nil { + if _, err := woc.wf.GetNodeByName(ctrNodeName); err != nil { continue } switch { @@ -1758,10 +1759,15 @@ type executeTemplateOpts struct { func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, orgTmpl wfv1.TemplateReferenceHolder, tmplCtx *templateresolution.Context, args wfv1.Arguments, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) { woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s", nodeName, common.GetTemplateHolderString(orgTmpl), opts.boundaryID) - node := woc.wf.GetNodeByName(nodeName) - // Set templateScope from which the template resolution starts. templateScope := tmplCtx.GetTemplateScope() + + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { + // Will be initialized via woc.initializeNodeOrMarkError + woc.log.Warnf("Node was nil, will be initialized as type Skipped") + } + newTmplCtx, resolvedTmpl, templateStored, err := tmplCtx.ResolveTemplate(orgTmpl) if err != nil { return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, err), err @@ -1840,7 +1846,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } else { node = woc.initializeCacheNode(nodeName, processedTmpl, templateScope, orgTmpl, opts.boundaryID, memoizationStatus) } - woc.wf.Status.Nodes[node.ID] = *node + woc.wf.Status.Nodes.Set(node.ID, *node) woc.updated = true } @@ -1865,7 +1871,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, if node.StartedAt.IsZero() { node.StartedAt = metav1.Time{Time: time.Now().UTC()} node.EstimatedDuration = woc.estimateNodeDuration(node.Name) - woc.wf.Status.Nodes[node.ID] = *node + woc.wf.Status.Nodes.Set(node.ID, *node) woc.updated = true } } @@ -1906,11 +1912,15 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, // unexpected behavior and is a bug. panic("bug: GetLockName should not return an error after a call to TryAcquire") } - return woc.markNodeWaitingForLock(node.Name, lockName.EncodeName()), nil + return woc.markNodeWaitingForLock(node.Name, lockName.EncodeName()) } else { woc.log.Infof("Node %s acquired synchronization lock", nodeName) if node != nil { - node = woc.markNodeWaitingForLock(node.Name, "") + node, err = woc.markNodeWaitingForLock(node.Name, "") + if err != nil { + woc.log.WithField("node.Name", node.Name).WithField("lockName", "").Error("markNodeWaitingForLock returned err") + return nil, err + } } } @@ -1945,7 +1955,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, // Runtime parameters (e.g., `status`, `resourceDuration`) in the output will be used to emit metrics. if lastChildNode != nil { retryParentNode.Outputs = lastChildNode.Outputs.DeepCopy() - woc.wf.Status.Nodes[node.ID] = *retryParentNode + woc.wf.Status.Nodes.Set(node.ID, *retryParentNode) } if processedTmpl.Metrics != nil { // In this check, a completed node may or may not have existed prior to this execution. If it did exist, ensure that it wasn't @@ -1960,6 +1970,11 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, if processedTmpl.Synchronization != nil { woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization) } + lastChildNode := getChildNodeIndex(retryParentNode, woc.wf.Status.Nodes, -1) + if lastChildNode != nil { + retryParentNode.Outputs = lastChildNode.Outputs.DeepCopy() + woc.wf.Status.Nodes.Set(node.ID, *retryParentNode) + } return retryParentNode, nil } else if lastChildNode != nil && lastChildNode.Fulfilled() && processedTmpl.Metrics != nil { // If retry node has not completed and last child node has completed, emit metrics for the last child node. @@ -2070,8 +2085,8 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } } - retrieveNode := woc.wf.GetNodeByName(node.Name) - if retrieveNode == nil { + retrieveNode, err := woc.wf.GetNodeByName(node.Name) + if err != nil { err := fmt.Errorf("no Node found by the name of %s; wf.Status.Nodes=%+v", node.Name, woc.wf.Status.Nodes) woc.log.Error(err) woc.markWorkflowError(ctx, err) @@ -2081,9 +2096,8 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, // Swap the node back to retry node if retryNodeName != "" { - retryNode := woc.wf.GetNodeByName(retryNodeName) - - if retryNode == nil { + retryNode, err := woc.wf.GetNodeByName(retryNodeName) + if err != nil { err := fmt.Errorf("no Retry Node found by the name of %s; wf.Status.Nodes=%+v", retryNodeName, woc.wf.Status.Nodes) woc.log.Error(err) woc.markWorkflowError(ctx, err) @@ -2091,6 +2105,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } if !retryNode.Fulfilled() && node.Fulfilled() { // if the retry child has completed we need to update outself + retryNode, err = woc.executeTemplate(ctx, retryNodeName, orgTmpl, tmplCtx, args, opts) if err != nil { return woc.markNodeError(node.Name, err), err @@ -2163,11 +2178,14 @@ func (woc *wfOperationCtx) markWorkflowPhase(ctx context.Context, phase wfv1.Wor } if phase == wfv1.WorkflowError { - entryNode, ok := woc.wf.Status.Nodes[woc.wf.ObjectMeta.Name] - if ok && entryNode.Phase == wfv1.NodeRunning { + entryNode, err := woc.wf.Status.Nodes.Get(woc.wf.ObjectMeta.Name) + if err != nil { + woc.log.Errorf("was unable to obtain node for %s", woc.wf.ObjectMeta.Name) + } + if (err == nil) && entryNode.Phase == wfv1.NodeRunning { entryNode.Phase = wfv1.NodeError entryNode.Message = "Workflow operation error" - woc.wf.Status.Nodes[woc.wf.ObjectMeta.Name] = entryNode + woc.wf.Status.Nodes.Set(woc.wf.ObjectMeta.Name, *entryNode) woc.updated = true } } @@ -2285,7 +2303,7 @@ func (woc *wfOperationCtx) initializeExecutableNode(nodeName string, nodeType wf } // Update the node - woc.wf.Status.Nodes[node.ID] = *node + woc.wf.Status.Nodes.Set(node.ID, *node) woc.updated = true return node @@ -2296,6 +2314,7 @@ func (woc *wfOperationCtx) initializeNodeOrMarkError(node *wfv1.NodeStatus, node if node != nil { return woc.markNodeError(nodeName, err) } + return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, templateScope, orgTmpl, boundaryID, wfv1.NodeError, err.Error()) } @@ -2307,6 +2326,7 @@ func (woc *wfOperationCtx) initializeCacheNode(nodeName string, resolvedTmpl *wf panic(err) } woc.log.Debug("Initializing cached node ", nodeName, common.GetTemplateHolderString(orgTmpl), boundaryID) + node := woc.initializeExecutableNode(nodeName, wfutil.GetNodeType(resolvedTmpl), templateScope, resolvedTmpl, orgTmpl, boundaryID, wfv1.NodePending, messages...) node.MemoizationStatus = memStat return node @@ -2325,7 +2345,7 @@ func (woc *wfOperationCtx) initializeNode(nodeName string, nodeType wfv1.NodeTyp woc.log.Debugf("Initializing node %s: template: %s, boundaryID: %s", nodeName, common.GetTemplateHolderString(orgTmpl), boundaryID) nodeID := woc.wf.NodeID(nodeName) - _, ok := woc.wf.Status.Nodes[nodeID] + ok := woc.wf.Status.Nodes.Has(nodeID) if ok { panic(fmt.Sprintf("node %s already initialized", nodeName)) } @@ -2343,12 +2363,13 @@ func (woc *wfOperationCtx) initializeNode(nodeName string, nodeType wfv1.NodeTyp EstimatedDuration: woc.estimateNodeDuration(nodeName), } - if boundaryNode, ok := woc.wf.Status.Nodes[boundaryID]; ok { + if boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID); err == nil { node.DisplayName = strings.TrimPrefix(node.Name, boundaryNode.Name) if stepsOrDagSeparator.MatchString(node.DisplayName) { node.DisplayName = stepsOrDagSeparator.ReplaceAllString(node.DisplayName, "") } } else { + woc.log.Infof("was unable to obtain node for %s, letting display name to be nodeName", boundaryID) node.DisplayName = nodeName } @@ -2360,7 +2381,7 @@ func (woc *wfOperationCtx) initializeNode(nodeName string, nodeType wfv1.NodeTyp message = fmt.Sprintf(" (message: %s)", messages[0]) node.Message = messages[0] } - woc.wf.Status.Nodes[nodeID] = node + woc.wf.Status.Nodes.Set(nodeID, node) woc.log.Infof("%s node %v initialized %s%s", node.Type, node.ID, node.Phase, message) woc.updated = true return &node @@ -2368,8 +2389,8 @@ func (woc *wfOperationCtx) initializeNode(nodeName string, nodeType wfv1.NodeTyp // markNodePhase marks a node with the given phase, creating the node if necessary and handles timestamps func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, message ...string) *wfv1.NodeStatus { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { woc.log.Warningf("workflow '%s' node '%s' uninitialized when marking as %v: %s", woc.wf.Name, nodeName, phase, message) node = &wfv1.NodeStatus{} } @@ -2394,7 +2415,7 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, woc.log.Infof("node %s finished: %s", node.ID, node.FinishedAt) woc.updated = true } - woc.wf.Status.Nodes[node.ID] = *node + woc.wf.Status.Nodes.Set(node.ID, *node) return node } @@ -2493,10 +2514,10 @@ func (woc *wfOperationCtx) markNodePending(nodeName string, err error) *wfv1.Nod } // markNodeWaitingForLock is a convenience method to mark that a node is waiting for a lock -func (woc *wfOperationCtx) markNodeWaitingForLock(nodeName string, lockName string) *wfv1.NodeStatus { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { - return node +func (woc *wfOperationCtx) markNodeWaitingForLock(nodeName string, lockName string) (*wfv1.NodeStatus, error) { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { + return node, err } if node.SynchronizationStatus == nil { @@ -2511,9 +2532,9 @@ func (woc *wfOperationCtx) markNodeWaitingForLock(nodeName string, lockName stri node.SynchronizationStatus.Waiting = lockName } - woc.wf.Status.Nodes[node.ID] = *node + woc.wf.Status.Nodes.Set(node.ID, *node) woc.updated = true - return node + return node, nil } // checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism @@ -2540,15 +2561,16 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node // if we are about to execute a pod, make sure our parent hasn't reached it's limit if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) { - boundaryNode, ok := woc.wf.Status.Nodes[boundaryID] - if !ok { + boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID) + if err != nil { + woc.log.Errorf("was unable to obtain node for %s", boundaryID) return errors.InternalError("boundaryNode not found") } tmplCtx, err := woc.createTemplateContext(boundaryNode.GetTemplateScope()) if err != nil { return err } - _, boundaryTemplate, templateStored, err := tmplCtx.ResolveTemplate(&boundaryNode) + _, boundaryTemplate, templateStored, err := tmplCtx.ResolveTemplate(boundaryNode) if err != nil { return err } @@ -2573,8 +2595,8 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node } func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending) } @@ -2600,7 +2622,11 @@ func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string } func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string { - node := woc.wf.Status.Nodes[nodeID] + node, err := woc.wf.Status.Nodes.Get(nodeID) + if err != nil { + woc.log.Fatalf("was unable to obtain node for %s", nodeID) + panic(fmt.Sprintf("Expected node for %s", nodeID)) + } switch node.Type { case wfv1.NodeTypeSkipped, wfv1.NodeTypeSuspend, wfv1.NodeTypeHTTP, wfv1.NodeTypePlugin: return []string{node.ID} @@ -2611,7 +2637,7 @@ func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string { if err != nil { return []string{node.ID} } - _, parentTemplate, _, err := tmplCtx.ResolveTemplate(&node) + _, parentTemplate, _, err := tmplCtx.ResolveTemplate(node) if err != nil { return []string{node.ID} } @@ -2773,8 +2799,8 @@ loop: } func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending) } else if !node.Pending() { return node, nil @@ -3026,8 +3052,9 @@ func (woc *wfOperationCtx) addArtifactToGlobalScope(art wfv1.Artifact) { func (woc *wfOperationCtx) addChildNode(parent string, child string) { parentID := woc.wf.NodeID(parent) childID := woc.wf.NodeID(child) - node, ok := woc.wf.Status.Nodes[parentID] - if !ok { + node, err := woc.wf.Status.Nodes.Get(parentID) + if err != nil { + woc.log.Fatalf("was unable to obtain node for %s", parentID) panic(fmt.Sprintf("parent node %s not initialized", parent)) } for _, nodeID := range node.Children { @@ -3037,15 +3064,15 @@ func (woc *wfOperationCtx) addChildNode(parent string, child string) { } } node.Children = append(node.Children, childID) - woc.wf.Status.Nodes[parentID] = node + woc.wf.Status.Nodes.Set(parentID, *node) woc.updated = true } // executeResource is runs a kubectl command against a manifest func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) { - node := woc.wf.GetNodeByName(nodeName) + node, err := woc.wf.GetNodeByName(nodeName) - if node == nil { + if err != nil { node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending) } else if !node.Pending() { return node, nil @@ -3071,7 +3098,7 @@ func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string, mainCtr := woc.newExecContainer(common.MainContainerName, tmpl) mainCtr.Command = []string{"argoexec", "resource", tmpl.Resource.Action} - _, err := woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline}) + _, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline}) if err != nil { return woc.requeueIfTransientErr(err, node.Name) } @@ -3080,8 +3107,8 @@ func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string, } func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending) } else if !node.Pending() { return node, nil @@ -3103,8 +3130,8 @@ func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, tem } func (woc *wfOperationCtx) executeSuspend(nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypeSuspend, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending) woc.resolveInputFieldsForSuspendNode(node) } @@ -3115,7 +3142,10 @@ func (woc *wfOperationCtx) executeSuspend(nodeName string, templateScope string, var requeueTime *time.Time if tmpl.Suspend.Duration != "" { - node := woc.wf.GetNodeByName(nodeName) + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { + return nil, err + } suspendDuration, err := parseStringToDuration(tmpl.Suspend.Duration) if err != nil { return node, err @@ -3561,12 +3591,12 @@ func (woc *wfOperationCtx) deletePDBResource(ctx context.Context) error { // Check if the output of this node is referenced elsewhere in the Workflow. If so, make sure to include it during // execution. func (woc *wfOperationCtx) includeScriptOutput(nodeName, boundaryID string) (bool, error) { - if boundaryNode, ok := woc.wf.Status.Nodes[boundaryID]; ok { + if boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID); err == nil { tmplCtx, err := woc.createTemplateContext(boundaryNode.GetTemplateScope()) if err != nil { return false, err } - _, parentTemplate, templateStored, err := tmplCtx.ResolveTemplate(&boundaryNode) + _, parentTemplate, templateStored, err := tmplCtx.ResolveTemplate(boundaryNode) if err != nil { return false, err } @@ -3577,6 +3607,8 @@ func (woc *wfOperationCtx) includeScriptOutput(nodeName, boundaryID string) (boo name := getStepOrDAGTaskName(nodeName) return hasOutputResultRef(name, parentTemplate), nil + } else { + woc.log.Errorf("was unable to obtain node for %s", boundaryID) } return false, nil } diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 00b309f7b2f4..ed5abc1a6978 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -545,13 +545,14 @@ func TestProcessNodeRetries(t *testing.T) { woc.addChildNode(nodeName, childNode) } - n := woc.wf.GetNodeByName(nodeName) + n, err := woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) lastChild = getChildNodeIndex(n, woc.wf.Status.Nodes, -1) assert.NotNil(t, lastChild) // Last child is still running. processNodeRetries() should return false since // there should be no retries at this point. - n, _, err := woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) assert.NoError(t, err) assert.Equal(t, n.Phase, wfv1.NodeRunning) @@ -567,14 +568,16 @@ func TestProcessNodeRetries(t *testing.T) { woc.markNodePhase(lastChild.Name, wfv1.NodeFailed) _, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) assert.NoError(t, err) - n = woc.wf.GetNodeByName(nodeName) + n, err = woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) assert.Equal(t, n.Phase, wfv1.NodeRunning) // Add a third node that has failed. childNode := "child-node-3" woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed) woc.addChildNode(nodeName, childNode) - n = woc.wf.GetNodeByName(nodeName) + n, err = woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) assert.NoError(t, err) assert.Equal(t, n.Phase, wfv1.NodeFailed) @@ -614,14 +617,15 @@ func TestProcessNodeRetriesOnErrors(t *testing.T) { woc.addChildNode(nodeName, childNode) } - n := woc.wf.GetNodeByName(nodeName) + n, err := woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) lastChild = getChildNodeIndex(n, woc.wf.Status.Nodes, -1) assert.NotNil(t, lastChild) // Last child is still running. processNodeRetries() should return false since // there should be no retries at this point. - n, _, err := woc.processNodeRetries(n, retries, &executeTemplateOpts{}) - assert.Nil(t, err) + n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + assert.NoError(t, err) assert.Equal(t, n.Phase, wfv1.NodeRunning) // Mark lastChild as successful. @@ -636,14 +640,16 @@ func TestProcessNodeRetriesOnErrors(t *testing.T) { woc.markNodePhase(lastChild.Name, wfv1.NodeError) _, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) assert.NoError(t, err) - n = woc.wf.GetNodeByName(nodeName) + n, err = woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) assert.Equal(t, n.Phase, wfv1.NodeRunning) // Add a third node that has errored. childNode := "child-node-3" woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError) woc.addChildNode(nodeName, childNode) - n = woc.wf.GetNodeByName(nodeName) + n, err = woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) assert.Nil(t, err) assert.Equal(t, n.Phase, wfv1.NodeError) @@ -683,14 +689,15 @@ func TestProcessNodeRetriesOnTransientErrors(t *testing.T) { woc.addChildNode(nodeName, childNode) } - n := woc.wf.GetNodeByName(nodeName) + n, err := woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) lastChild = getChildNodeIndex(n, woc.wf.Status.Nodes, -1) assert.NotNil(t, lastChild) // Last child is still running. processNodeRetries() should return false since // there should be no retries at this point. - n, _, err := woc.processNodeRetries(n, retries, &executeTemplateOpts{}) - assert.Nil(t, err) + n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + assert.NoError(t, err) assert.Equal(t, n.Phase, wfv1.NodeRunning) // Mark lastChild as successful. @@ -709,7 +716,8 @@ func TestProcessNodeRetriesOnTransientErrors(t *testing.T) { _ = os.Setenv(transientEnvVarKey, transientErrMsg) _, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) assert.NoError(t, err) - n = woc.wf.GetNodeByName(nodeName) + n, err = woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) assert.Equal(t, n.Phase, wfv1.NodeRunning) _ = os.Unsetenv(transientEnvVarKey) @@ -717,7 +725,8 @@ func TestProcessNodeRetriesOnTransientErrors(t *testing.T) { childNode := "child-node-3" woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError) woc.addChildNode(nodeName, childNode) - n = woc.wf.GetNodeByName(nodeName) + n, err = woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) assert.Nil(t, err) assert.Equal(t, n.Phase, wfv1.NodeError) @@ -758,14 +767,15 @@ func TestProcessNodeRetriesWithBackoff(t *testing.T) { woc.initializeNode("child-node-1", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning) woc.addChildNode(nodeName, "child-node-1") - n := woc.wf.GetNodeByName(nodeName) + n, err := woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) lastChild = getChildNodeIndex(n, woc.wf.Status.Nodes, -1) assert.NotNil(t, lastChild) // Last child is still running. processNodeRetries() should return false since // there should be no retries at this point. - n, _, err := woc.processNodeRetries(n, retries, &executeTemplateOpts{}) - assert.Nil(t, err) + n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + assert.NoError(t, err) assert.Equal(t, n.Phase, wfv1.NodeRunning) // Mark lastChild as successful. @@ -812,10 +822,10 @@ func TestProcessNodeRetriesWithExponentialBackoff(t *testing.T) { woc.initializeNode("child-node-1", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed) woc.addChildNode(nodeName, "child-node-1") - n := woc.wf.GetNodeByName(nodeName) + n, err := woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) - // Last child has failed. processNodeRetries() should return false due to the default backoff. - var err error + // Last child has failed. processNodesWithRetries() should return false due to the default backoff. n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) require.NoError(err) require.Equal(wfv1.NodeRunning, n.Phase) @@ -828,7 +838,8 @@ func TestProcessNodeRetriesWithExponentialBackoff(t *testing.T) { woc.initializeNode("child-node-2", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError) woc.addChildNode(nodeName, "child-node-2") - n = woc.wf.GetNodeByName(nodeName) + n, err = woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) require.NoError(err) @@ -905,14 +916,15 @@ func TestProcessNodesNoRetryWithError(t *testing.T) { woc.addChildNode(nodeName, childNode) } - n := woc.wf.GetNodeByName(nodeName) + n, err := woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) lastChild = getChildNodeIndex(n, woc.wf.Status.Nodes, -1) assert.NotNil(t, lastChild) // Last child is still running. processNodeRetries() should return false since // there should be no retries at this point. - n, _, err := woc.processNodeRetries(n, retries, &executeTemplateOpts{}) - assert.Nil(t, err) + n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + assert.NoError(t, err) assert.Equal(t, n.Phase, wfv1.NodeRunning) // Mark lastChild as successful. @@ -928,7 +940,8 @@ func TestProcessNodesNoRetryWithError(t *testing.T) { woc.markNodePhase(lastChild.Name, wfv1.NodeError) _, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) assert.NoError(t, err) - n = woc.wf.GetNodeByName(nodeName) + n, err = woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) assert.Equal(t, wfv1.NodeError, n.Phase) } @@ -1062,7 +1075,8 @@ func TestBackoffMessage(t *testing.T) { assert.NotNil(t, wf) woc := newWorkflowOperationCtx(wf, controller) assert.NotNil(t, woc) - retryNode := woc.wf.GetNodeByName("retry-backoff-s69z6") + retryNode, err := woc.wf.GetNodeByName("retry-backoff-s69z6") + assert.NoError(t, err) // Simulate backoff of 4 secods firstNode := getChildNodeIndex(retryNode, woc.wf.Status.Nodes, 0) @@ -2904,9 +2918,11 @@ func TestResolveIOPathPlaceholders(t *testing.T) { assert.NoError(t, err) assert.True(t, len(pods.Items) > 0, "pod was not created successfully") - assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary", + assert.Equal(t, []string{ + "/var/run/argo/argoexec", "emissary", "--loglevel", getExecutorLogLevel(), "--log-format", woc.controller.cliExecutorLogFormat, - "--", "sh", "-c", "head -n 3 <\"/inputs/text/data\" | tee \"/outputs/text/data\" | wc -l > \"/outputs/actual-lines-count/data\""}, pods.Items[0].Spec.Containers[1].Command) + "--", "sh", "-c", "head -n 3 <\"/inputs/text/data\" | tee \"/outputs/text/data\" | wc -l > \"/outputs/actual-lines-count/data\"", + }, pods.Items[0].Spec.Containers[1].Command) } var outputValuePlaceholders = ` @@ -3729,12 +3745,18 @@ spec: woc.operate(ctx) time.Sleep(time.Second) // Parent dag node has no pod - parentNode := woc.wf.GetNodeByName("dag-events") + parentNode, err := woc.wf.GetNodeByName("dag-events") + if err != nil { + assert.NoError(t, err) + } pod, err := woc.getPodByNode(parentNode) assert.Nil(t, pod) assert.Error(t, err, "Expected node type Pod, got DAG") // Pod node should return a pod - podNode := woc.wf.GetNodeByName("dag-events.a") + podNode, err := woc.wf.GetNodeByName("dag-events.a") + if err != nil { + assert.NoError(t, err) + } pod, err = woc.getPodByNode(podNode) assert.NoError(t, err) assert.NotNil(t, pod) @@ -4035,7 +4057,8 @@ func TestRetryNodeOutputs(t *testing.T) { assert.NoError(t, err) woc := newWorkflowOperationCtx(wf, controller) - retryNode := woc.wf.GetNodeByName("daemon-step-dvbnn[0].influx") + retryNode, err := woc.wf.GetNodeByName("daemon-step-dvbnn[0].influx") + assert.NoError(t, err) assert.NotNil(t, retryNode) fmt.Println(retryNode) scope := &wfScope{ @@ -4121,8 +4144,10 @@ func TestDeletePVCDoesNotDeletePVCOnFailedWorkflow(t *testing.T) { ctx := context.Background() woc.operate(ctx) - node1 := woc.wf.GetNodeByName("wf-with-pvc(0)[0].succeed") - node2 := woc.wf.GetNodeByName("wf-with-pvc(0)[1].failure") + node1, err := woc.wf.GetNodeByName("wf-with-pvc(0)[0].succeed") + assert.NoError(err) + node2, err := woc.wf.GetNodeByName("wf-with-pvc(0)[1].failure") + assert.NoError(err) // Node 1 Succeeded assert.Equal(node1.Phase, wfv1.NodeSucceeded) @@ -4794,7 +4819,8 @@ func TestNoOnExitWhenSkipped(t *testing.T) { ctx := context.Background() woc := newWoc(*wf) woc.operate(ctx) - assert.Nil(t, woc.wf.GetNodeByName("B.onExit")) + _, err := woc.wf.GetNodeByName("B.onExit") + assert.Error(t, err) } func TestGenerateNodeName(t *testing.T) { @@ -5460,7 +5486,8 @@ func TestPropagateMaxDurationProcess(t *testing.T) { woc.addChildNode(nodeName, childNode) var opts executeTemplateOpts - n := woc.wf.GetNodeByName(nodeName) + n, err := woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) _, _, err = woc.processNodeRetries(n, retries, &opts) if assert.NoError(t, err) { assert.Equal(t, n.StartedAt.Add(20*time.Second).Round(time.Second).String(), opts.executionDeadline.Round(time.Second).String()) @@ -6650,7 +6677,9 @@ func TestPodHasContainerNeedingTermination(t *testing.T) { Name: common.MainContainerName, State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 1}}, }, - }}} + }, + }, + } tmpl := wfv1.Template{} assert.True(t, podHasContainerNeedingTermination(&pod, tmpl)) @@ -6665,7 +6694,9 @@ func TestPodHasContainerNeedingTermination(t *testing.T) { Name: common.MainContainerName, State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 1}}, }, - }}} + }, + }, + } assert.True(t, podHasContainerNeedingTermination(&pod, tmpl)) pod = apiv1.Pod{ @@ -6679,7 +6710,9 @@ func TestPodHasContainerNeedingTermination(t *testing.T) { Name: common.MainContainerName, State: apiv1.ContainerState{Running: &apiv1.ContainerStateRunning{}}, }, - }}} + }, + }, + } assert.False(t, podHasContainerNeedingTermination(&pod, tmpl)) pod = apiv1.Pod{ @@ -6689,7 +6722,9 @@ func TestPodHasContainerNeedingTermination(t *testing.T) { Name: common.MainContainerName, State: apiv1.ContainerState{Running: &apiv1.ContainerStateRunning{}}, }, - }}} + }, + }, + } assert.False(t, podHasContainerNeedingTermination(&pod, tmpl)) pod = apiv1.Pod{ @@ -6699,7 +6734,9 @@ func TestPodHasContainerNeedingTermination(t *testing.T) { Name: common.MainContainerName, State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 1}}, }, - }}} + }, + }, + } assert.True(t, podHasContainerNeedingTermination(&pod, tmpl)) } @@ -6735,21 +6772,24 @@ func TestRetryOnDiffHost(t *testing.T) { woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning) woc.addChildNode(nodeName, childNode) - n := woc.wf.GetNodeByName(nodeName) + n, err := woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) lastChild = getChildNodeIndex(n, woc.wf.Status.Nodes, -1) assert.NotNil(t, lastChild) woc.markNodePhase(lastChild.Name, wfv1.NodeFailed) - _, _, err := woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + _, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + assert.NoError(t, err) + n, err = woc.wf.GetNodeByName(nodeName) assert.NoError(t, err) - n = woc.wf.GetNodeByName(nodeName) assert.Equal(t, n.Phase, wfv1.NodeRunning) // Ensure related fields are not set assert.Equal(t, lastChild.HostNodeName, "") // Set host name - n = woc.wf.GetNodeByName(nodeName) + n, err = woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) lastChild = getChildNodeIndex(n, woc.wf.Status.Nodes, -1) lastChild.HostNodeName = "test-fail-hostname" woc.wf.Status.Nodes[lastChild.ID] = *lastChild @@ -6767,8 +6807,7 @@ func TestRetryOnDiffHost(t *testing.T) { assert.NotNil(t, pod.Spec.Affinity) // Verify if template's Affinity has the right value - targetNodeSelectorRequirement := - pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0] + targetNodeSelectorRequirement := pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0] sourceNodeSelectorRequirement := apiv1.NodeSelectorRequirement{ Key: hostSelector, Operator: apiv1.NodeSelectorOpNotIn, @@ -6914,7 +6953,6 @@ func TestWorkflowInterpolatesNodeNameField(t *testing.T) { } assert.True(t, foundPod) - } func TestWorkflowShutdownStrategy(t *testing.T) { @@ -7364,7 +7402,6 @@ func TestSubstituteGlobalVariables(t *testing.T) { // - Workflow spec.workflowMetadata // - WorkflowTemplate spec.workflowMetadata func TestSubstituteGlobalVariablesLabelsAnnotations(t *testing.T) { - tests := []struct { name string workflow string @@ -7427,7 +7464,6 @@ func TestSubstituteGlobalVariablesLabelsAnnotations(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - wf := wfv1.MustUnmarshalWorkflow(tt.workflow) wftmpl := wfv1.MustUnmarshalWorkflowTemplate(tt.workflowTemplate) cancel, controller := newController(wf, wftmpl) @@ -7500,7 +7536,6 @@ func TestWfPendingWithNoPod(t *testing.T) { pods, err := listPods(woc) assert.NoError(t, err) assert.Equal(t, 1, len(pods.Items)) - } var wfPendingWithSync = `apiVersion: argoproj.io/v1alpha1 @@ -7766,7 +7801,7 @@ func TestDagTwoChildrenWithNonExpectedNodeType(t *testing.T) { sentNode := woc.wf.Status.Nodes.FindByDisplayName("sent") - //Ensure that both child tasks are labeled as children of the "sent" node + // Ensure that both child tasks are labeled as children of the "sent" node assert.Len(t, sentNode.Children, 2) } @@ -7996,7 +8031,8 @@ func TestOperatorRetryExpression(t *testing.T) { woc.operate(ctx) assert.Equal(t, wfv1.WorkflowFailed, woc.wf.Status.Phase) - retryNode := woc.wf.GetNodeByName("retry-script-9z9pv[1].retry") + retryNode, err := woc.wf.GetNodeByName("retry-script-9z9pv[1].retry") + assert.NoError(t, err) assert.Equal(t, wfv1.NodeFailed, retryNode.Phase) assert.Equal(t, 2, len(retryNode.Children)) assert.Equal(t, "retryStrategy.expression evaluated to false", retryNode.Message) @@ -8004,7 +8040,8 @@ func TestOperatorRetryExpression(t *testing.T) { func TestBuildRetryStrategyLocalScope(t *testing.T) { wf := wfv1.MustUnmarshalWorkflow(operatorRetryExpression) - retryNode := wf.GetNodeByName("retry-script-9z9pv[1].retry") + retryNode, err := wf.GetNodeByName("retry-script-9z9pv[1].retry") + assert.NoError(t, err) localScope := buildRetryStrategyLocalScope(retryNode, wf.Status.Nodes) @@ -8181,7 +8218,8 @@ func TestOperatorRetryExpressionError(t *testing.T) { woc.operate(ctx) assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase) - retryNode := woc.wf.GetNodeByName("retry-script-9z9pv[1].retry") + retryNode, err := woc.wf.GetNodeByName("retry-script-9z9pv[1].retry") + assert.NoError(t, err) assert.Equal(t, wfv1.NodeRunning, retryNode.Phase) assert.Equal(t, 3, len(retryNode.Children)) } @@ -8349,7 +8387,9 @@ func TestOperatorRetryExpressionErrorNoExpr(t *testing.T) { woc.operate(ctx) assert.Equal(t, wfv1.WorkflowFailed, woc.wf.Status.Phase) - retryNode := woc.wf.GetNodeByName("retry-script-9z9pv[1].retry") + retryNode, err := woc.wf.GetNodeByName("retry-script-9z9pv[1].retry") + assert.NoError(t, err) + assert.Equal(t, wfv1.NodeError, retryNode.Phase) assert.Equal(t, 2, len(retryNode.Children)) assert.Equal(t, "Error (exit code 1)", retryNode.Message) @@ -8522,10 +8562,12 @@ func TestExitHandlerWithRetryNodeParam(t *testing.T) { woc := newWorkflowOperationCtx(wf, controller) woc.operate(ctx) - retryStepNode := woc.wf.GetNodeByName("exit-handler-with-param-xbh52[0].step-1") + retryStepNode, err := woc.wf.GetNodeByName("exit-handler-with-param-xbh52[0].step-1") + assert.NoError(t, err) assert.Equal(t, 1, len(retryStepNode.Outputs.Parameters)) assert.Equal(t, "hello world", retryStepNode.Outputs.Parameters[0].Value.String()) - onExitNode := woc.wf.GetNodeByName("exit-handler-with-param-xbh52[0].step-1.onExit") + onExitNode, err := woc.wf.GetNodeByName("exit-handler-with-param-xbh52[0].step-1.onExit") + assert.NoError(t, err) assert.Equal(t, "hello world", onExitNode.Inputs.Parameters[0].Value.String()) } diff --git a/workflow/controller/plugin_template.go b/workflow/controller/plugin_template.go index 0124235ab3e8..75d86bacfa2b 100644 --- a/workflow/controller/plugin_template.go +++ b/workflow/controller/plugin_template.go @@ -5,8 +5,11 @@ import ( ) func (woc *wfOperationCtx) executePluginTemplate(nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) *wfv1.NodeStatus { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { + if opts.boundaryID == "" { + woc.log.Warnf("[DEBUG] boundaryID was nil") + } node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePlugin, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending) } if !node.Fulfilled() { diff --git a/workflow/controller/pod_cleanup.go b/workflow/controller/pod_cleanup.go index 193dab13aa6a..55823c70d654 100644 --- a/workflow/controller/pod_cleanup.go +++ b/workflow/controller/pod_cleanup.go @@ -26,7 +26,12 @@ func (woc *wfOperationCtx) queuePodsForCleanup() { continue } nodeID := woc.nodeID(pod) - if !woc.wf.Status.Nodes[nodeID].Phase.Fulfilled() { + nodePhase, err := woc.wf.Status.Nodes.GetPhase(nodeID) + if err != nil { + woc.log.Errorf("was unable to obtain node for %s", nodeID) + continue + } + if !nodePhase.Fulfilled() { continue } switch determinePodCleanupAction(selector, pod.Labels, strategy, workflowPhase, pod.Status.Phase) { diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index 38d27f5af0ce..378c07a382ff 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -36,13 +36,18 @@ type stepsContext struct { } func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tmplCtx *templateresolution.Context, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypeSteps, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodeRunning) } defer func() { - if woc.wf.Status.Nodes[node.ID].Fulfilled() { + nodePhase, err := woc.wf.Status.Nodes.GetPhase(node.ID) + if err != nil { + woc.log.Fatalf("was unable to obtain nodePhase for %s", node.ID) + panic(fmt.Sprintf("unable to obtain nodePhase for %s", node.ID)) + } + if nodePhase.Fulfilled() { woc.killDaemonedChildren(node.ID) } }() @@ -61,8 +66,8 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm for i, stepGroup := range tmpl.Steps { sgNodeName := fmt.Sprintf("%s[%d]", nodeName, i) { - sgNode := woc.wf.GetNodeByName(sgNodeName) - if sgNode == nil { + sgNode, err := woc.wf.GetNodeByName(sgNodeName) + if err != nil { _ = woc.initializeNode(sgNodeName, wfv1.NodeTypeStepGroup, stepTemplateScope, &wfv1.WorkflowStep{}, stepsCtx.boundaryID, wfv1.NodeRunning) } else if !sgNode.Fulfilled() { _ = woc.markNodePhase(sgNodeName, wfv1.NodeRunning) @@ -76,7 +81,10 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm // Otherwise connect all the outbound nodes of the previous step group as parents to // the current step group node. prevStepGroupName := fmt.Sprintf("%s[%d]", nodeName, i-1) - prevStepGroupNode := woc.wf.GetNodeByName(prevStepGroupName) + prevStepGroupNode, err := woc.wf.GetNodeByName(prevStepGroupName) + if err != nil { + return nil, err + } if len(prevStepGroupNode.Children) == 0 { // corner case which connects an empty StepGroup (e.g. due to empty withParams) to // the previous StepGroup node @@ -86,14 +94,21 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm outboundNodeIDs := woc.getOutboundNodes(childID) woc.log.Infof("SG Outbound nodes of %s are %s", childID, outboundNodeIDs) for _, outNodeID := range outboundNodeIDs { - woc.addChildNode(woc.wf.Status.Nodes[outNodeID].Name, sgNodeName) + outNodeName, err := woc.wf.Status.Nodes.GetName(outNodeID) + if err != nil { + woc.log.Fatalf("was not able to obtain node name for %s", outNodeID) + panic(fmt.Sprintf("could not obtain the out noden name for %s", outNodeID)) + } + woc.addChildNode(outNodeName, sgNodeName) } } } } - sgNode := woc.executeStepGroup(ctx, stepGroup.Steps, sgNodeName, &stepsCtx) - + sgNode, err := woc.executeStepGroup(ctx, stepGroup.Steps, sgNodeName, &stepsCtx) + if err != nil { + return nil, err + } if !sgNode.Fulfilled() { woc.log.Infof("Workflow step group node %s not yet completed", sgNode.ID) return node, nil @@ -102,16 +117,18 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm if sgNode.FailedOrError() { failMessage := fmt.Sprintf("step group %s was unsuccessful: %s", sgNode.ID, sgNode.Message) woc.log.Info(failMessage) - woc.updateOutboundNodes(nodeName, tmpl) + if err = woc.updateOutboundNodes(nodeName, tmpl); err != nil { + return nil, err + } return woc.markNodePhase(nodeName, wfv1.NodeFailed, sgNode.Message), nil } // Add all outputs of each step in the group to the scope for _, step := range stepGroup.Steps { childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name) - childNode := woc.wf.GetNodeByName(childNodeName) + childNode, err := woc.wf.GetNodeByName(childNodeName) prefix := fmt.Sprintf("steps.%s", step.Name) - if childNode == nil { + if err != nil { // This happens when there was `withItem/withParam` expansion. // We add the aggregate outputs of our children to the scope as a JSON list var childNodes []wfv1.NodeStatus @@ -144,74 +161,90 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm } } - woc.updateOutboundNodes(nodeName, tmpl) + err = woc.updateOutboundNodes(nodeName, tmpl) + if err != nil { + return nil, err + } // If this template has outputs from any of its steps, copy them to this node here outputs, err := getTemplateOutputsFromScope(tmpl, stepsCtx.scope) if err != nil { return node, err } if outputs != nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { + return nil, err + } node.Outputs = outputs woc.addOutputsToGlobalScope(node.Outputs) - woc.wf.Status.Nodes[node.ID] = *node - } - if node.MemoizationStatus != nil { - c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName) - err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, outputs) - if err != nil { - woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache") - node.Phase = wfv1.NodeError + woc.wf.Status.Nodes.Set(node.ID, *node) + if node.MemoizationStatus != nil { + c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName) + err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs) + if err != nil { + woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache") + node.Phase = wfv1.NodeError + } } } return woc.markNodePhase(nodeName, wfv1.NodeSucceeded), nil } // updateOutboundNodes set the outbound nodes from the last step group -func (woc *wfOperationCtx) updateOutboundNodes(nodeName string, tmpl *wfv1.Template) { +func (woc *wfOperationCtx) updateOutboundNodes(nodeName string, tmpl *wfv1.Template) error { outbound := make([]string, 0) // Find the last, initialized stepgroup node var lastSGNode *wfv1.NodeStatus + var err error for i := len(tmpl.Steps) - 1; i >= 0; i-- { - sgNode := woc.wf.GetNodeByName(fmt.Sprintf("%s[%d]", nodeName, i)) - if sgNode != nil { + var sgNode *wfv1.NodeStatus + sgNode, err = woc.wf.GetNodeByName(fmt.Sprintf("%s[%d]", nodeName, i)) + if err == nil { lastSGNode = sgNode break } } if lastSGNode == nil { woc.log.Warnf("node '%s' had no initialized StepGroup nodes", nodeName) - return + return err } for _, childID := range lastSGNode.Children { outboundNodeIDs := woc.getOutboundNodes(childID) woc.log.Infof("Outbound nodes of %s is %s", childID, outboundNodeIDs) outbound = append(outbound, outboundNodeIDs...) } - node := woc.wf.GetNodeByName(nodeName) + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { + return err + } woc.log.Infof("Outbound nodes of %s is %s", node.ID, outbound) node.OutboundNodes = outbound - woc.wf.Status.Nodes[node.ID] = *node + woc.wf.Status.Nodes.Set(node.ID, *node) + return nil } // executeStepGroup examines a list of parallel steps and executes them in parallel. // Handles referencing of variables in scope, expands `withItem` clauses, and evaluates `when` expressions -func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv1.WorkflowStep, sgNodeName string, stepsCtx *stepsContext) *wfv1.NodeStatus { - node := woc.wf.GetNodeByName(sgNodeName) +func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv1.WorkflowStep, sgNodeName string, stepsCtx *stepsContext) (*wfv1.NodeStatus, error) { + node, err := woc.wf.GetNodeByName(sgNodeName) + if err != nil { + return nil, err + } if node.Fulfilled() { woc.log.Debugf("Step group node %v already marked completed", node) - return node + return node, nil } // First, resolve any references to outputs from previous steps, and perform substitution - stepGroup, err := woc.resolveReferences(stepGroup, stepsCtx.scope) + stepGroup, err = woc.resolveReferences(stepGroup, stepsCtx.scope) if err != nil { - return woc.markNodeError(sgNodeName, err) + return woc.markNodeError(sgNodeName, err), nil } // Next, expand the step's withItems (if any) stepGroup, err = woc.expandStepGroup(sgNodeName, stepGroup, stepsCtx) if err != nil { - return woc.markNodeError(sgNodeName, err) + return woc.markNodeError(sgNodeName, err), nil } // Maps nodes to their steps @@ -230,10 +263,10 @@ func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv woc.initializeNode(childNodeName, wfv1.NodeTypeSkipped, stepTemplateScope, &step, stepsCtx.boundaryID, wfv1.NodeError, err.Error()) woc.addChildNode(sgNodeName, childNodeName) woc.markNodeError(childNodeName, err) - return woc.markNodeError(sgNodeName, err) + return woc.markNodeError(sgNodeName, err), nil } if !proceed { - if woc.wf.GetNodeByName(childNodeName) == nil { + if _, err := woc.wf.GetNodeByName(childNodeName); err != nil { skipReason := fmt.Sprintf("when '%s' evaluated false", step.When) woc.log.Infof("Skipping %s: %s", childNodeName, skipReason) woc.initializeNode(childNodeName, wfv1.NodeTypeSkipped, stepTemplateScope, &step, stepsCtx.boundaryID, wfv1.NodeSkipped, skipReason) @@ -242,17 +275,20 @@ func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv continue } + if stepsCtx.boundaryID == "" { + woc.log.Warnf("boundaryID was nil") + } childNode, err := woc.executeTemplate(ctx, childNodeName, &step, stepsCtx.tmplCtx, step.Arguments, &executeTemplateOpts{boundaryID: stepsCtx.boundaryID, onExitTemplate: stepsCtx.onExitTemplate}) if err != nil { switch err { case ErrDeadlineExceeded: - return node + return node, nil case ErrParallelismReached: case ErrTimeout: - return woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("child '%s' timedout", childNodeName)) + return woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("child '%s' timedout", childNodeName)), nil default: woc.addChildNode(sgNodeName, childNodeName) - return woc.markNodeError(node.Name, fmt.Errorf("step group deemed errored due to child %s error: %w", childNodeName, err)) + return woc.markNodeError(node.Name, fmt.Errorf("step group deemed errored due to child %s error: %w", childNodeName, err)), nil } } if childNode != nil { @@ -261,26 +297,34 @@ func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv } } - node = woc.wf.GetNodeByName(sgNodeName) + node, err = woc.wf.GetNodeByName(sgNodeName) + if err != nil { + return nil, err + } // Return if not all children completed completed := true for _, childNodeID := range node.Children { - childNode := woc.wf.Status.Nodes[childNodeID] + childNode, err := woc.wf.Status.Nodes.Get(childNodeID) + if err != nil { + errorMsg := fmt.Sprintf("was unable to obtain childNode for %s", childNodeID) + woc.log.Errorf(errorMsg) + return nil, fmt.Errorf(errorMsg) + } step := nodeSteps[childNode.Name] stepsCtx.scope.addParamToScope(fmt.Sprintf("steps.%s.status", childNode.DisplayName), string(childNode.Phase)) - hookCompleted, err := woc.executeTmplLifeCycleHook(ctx, stepsCtx.scope, step.Hooks, &childNode, stepsCtx.boundaryID, stepsCtx.tmplCtx, "steps."+step.Name) + hookCompleted, err := woc.executeTmplLifeCycleHook(ctx, stepsCtx.scope, step.Hooks, childNode, stepsCtx.boundaryID, stepsCtx.tmplCtx, "steps."+step.Name) if err != nil { woc.markNodeError(node.Name, err) } // Check all hooks are completed if !hookCompleted { - return node + return node, nil } if !childNode.Fulfilled() { completed = false } else if childNode.Completed() { - hasOnExitNode, onExitNode, err := woc.runOnExitNode(ctx, step.GetExitHook(woc.execWf.Spec.Arguments), &childNode, stepsCtx.boundaryID, stepsCtx.tmplCtx, "steps."+step.Name, stepsCtx.scope) + hasOnExitNode, onExitNode, err := woc.runOnExitNode(ctx, step.GetExitHook(woc.execWf.Spec.Arguments), childNode, stepsCtx.boundaryID, stepsCtx.tmplCtx, "steps."+step.Name, stepsCtx.scope) if hasOnExitNode && (onExitNode == nil || !onExitNode.Fulfilled() || err != nil) { // The onExit node is either not complete or has errored out, return. completed = false @@ -288,23 +332,27 @@ func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv } } if !completed { - return node + return node, nil } woc.addOutputsToGlobalScope(node.Outputs) // All children completed. Determine step group status as a whole for _, childNodeID := range node.Children { - childNode := woc.wf.Status.Nodes[childNodeID] + childNode, err := woc.wf.Status.Nodes.Get(childNodeID) + if err != nil { + woc.log.Fatalf("was unable to obtain node for %s", childNodeID) + panic(fmt.Sprintf("unable to get childNode for %s", childNodeID)) + } step := nodeSteps[childNode.Name] if childNode.FailedOrError() && !step.ContinuesOn(childNode.Phase) { failMessage := fmt.Sprintf("child '%s' failed", childNodeID) woc.log.Infof("Step group node %s deemed failed: %s", node.ID, failMessage) - return woc.markNodePhase(node.Name, wfv1.NodeFailed, failMessage) + return woc.markNodePhase(node.Name, wfv1.NodeFailed, failMessage), nil } } woc.log.Infof("Step group node %v successful", node.ID) - return woc.markNodePhase(node.Name, wfv1.NodeSucceeded) + return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), nil } // shouldExecute evaluates a already substituted when expression to decide whether or not a step should execute @@ -436,7 +484,7 @@ func (woc *wfOperationCtx) expandStepGroup(sgNodeName string, stepGroup []wfv1.W if len(expandedStep) == 0 { // Empty list childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name) - if woc.wf.GetNodeByName(childNodeName) == nil { + if _, err := woc.wf.GetNodeByName(childNodeName); err != nil { stepTemplateScope := stepsCtx.tmplCtx.GetTemplateScope() skipReason := "Skipped, empty params" woc.log.Infof("Skipping %s: %s", childNodeName, skipReason) diff --git a/workflow/controller/taskresult.go b/workflow/controller/taskresult.go index 114fe9fdc86f..0f7270e38a0e 100644 --- a/workflow/controller/taskresult.go +++ b/workflow/controller/taskresult.go @@ -55,28 +55,28 @@ func (woc *wfOperationCtx) taskResultReconciliation() { for _, obj := range objs { result := obj.(*wfv1.WorkflowTaskResult) nodeID := result.Name - old, exist := woc.wf.Status.Nodes[nodeID] - if !exist { + old, err := woc.wf.Status.Nodes.Get(nodeID) + if err != nil { continue } - new := old.DeepCopy() + newNode := old.DeepCopy() if result.Outputs.HasOutputs() { - if new.Outputs == nil { - new.Outputs = &wfv1.Outputs{} + if newNode.Outputs == nil { + newNode.Outputs = &wfv1.Outputs{} } - result.Outputs.DeepCopyInto(new.Outputs) // preserve any existing values - if old.Outputs != nil && new.Outputs.ExitCode == nil { // prevent overwriting of ExitCode - new.Outputs.ExitCode = old.Outputs.ExitCode + result.Outputs.DeepCopyInto(newNode.Outputs) // preserve any existing values + if old.Outputs != nil && newNode.Outputs.ExitCode == nil { // prevent overwriting of ExitCode + newNode.Outputs.ExitCode = old.Outputs.ExitCode } } if result.Progress.IsValid() { - new.Progress = result.Progress + newNode.Progress = result.Progress } - if !reflect.DeepEqual(&old, new) { + if !reflect.DeepEqual(&old, newNode) { woc.log. WithField("nodeID", nodeID). Info("task-result changed") - woc.wf.Status.Nodes[nodeID] = *new + woc.wf.Status.Nodes.Set(nodeID, *newNode) woc.updated = true } } diff --git a/workflow/controller/taskset.go b/workflow/controller/taskset.go index 68f03b5ba404..a239ceca1c6c 100644 --- a/workflow/controller/taskset.go +++ b/workflow/controller/taskset.go @@ -102,8 +102,8 @@ func (woc *wfOperationCtx) taskSetReconciliation(ctx context.Context) { } func (woc *wfOperationCtx) nodeRequiresTaskSetReconciliation(nodeName string) bool { - node := woc.wf.GetNodeByName(nodeName) - if node == nil { + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { return false } // If this node is of type HTTP, it will need an HTTP reconciliation @@ -112,7 +112,11 @@ func (woc *wfOperationCtx) nodeRequiresTaskSetReconciliation(nodeName string) bo } for _, child := range node.Children { // If any of the node's children need an HTTP reconciliation, the parent node will also need one - childNodeName := woc.wf.Status.Nodes[child].Name + childNodeName, err := woc.wf.Status.Nodes.GetName(child) + if err != nil { + woc.log.Fatalf("was unable to get child node name for %s", child) + panic("unable to obtain child node name") + } if woc.nodeRequiresTaskSetReconciliation(childNodeName) { return true } @@ -130,14 +134,19 @@ func (woc *wfOperationCtx) reconcileTaskSet(ctx context.Context) error { woc.log.Info("TaskSet Reconciliation") if workflowTaskSet != nil && len(workflowTaskSet.Status.Nodes) > 0 { for nodeID, taskResult := range workflowTaskSet.Status.Nodes { - node := woc.wf.Status.Nodes[nodeID] + node, err := woc.wf.Status.Nodes.Get(nodeID) + if err != nil { + woc.log.Warnf("[SPECIAL][DEBUG] returning but assumed validity before") + woc.log.Errorf("[DEBUG] Was unable to obtain node for %s", nodeID) + return err + } node.Outputs = taskResult.Outputs.DeepCopy() node.Phase = taskResult.Phase node.Message = taskResult.Message node.FinishedAt = metav1.Now() - woc.wf.Status.Nodes[nodeID] = node + woc.wf.Status.Nodes.Set(nodeID, *node) if node.MemoizationStatus != nil && node.Succeeded() { c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName) err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 8d34d263db57..8dfcf18a4ad5 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -421,7 +421,10 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin } // Check if the template has exceeded its timeout duration. If it hasn't set the applicable activeDeadlineSeconds - node := woc.wf.GetNodeByName(nodeName) + node, err := woc.wf.GetNodeByName(nodeName) + if err != nil { + woc.log.Warnf("couldn't retrieve node for nodeName %s, will get nil templateDeadline", nodeName) + } templateDeadline, err := woc.checkTemplateTimeout(tmpl, node) if err != nil { return nil, err diff --git a/workflow/progress/updater.go b/workflow/progress/updater.go index 67f48a7b276d..4cfa0526cfa7 100644 --- a/workflow/progress/updater.go +++ b/workflow/progress/updater.go @@ -1,6 +1,8 @@ package progress import ( + log "github.com/sirupsen/logrus" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -16,7 +18,7 @@ func UpdateProgress(wf *wfv1.Workflow) { // all executable nodes should have progress defined, if not, we just set it to the default value. if node.Progress == wfv1.ProgressUndefined { node.Progress = wfv1.ProgressDefault - wf.Status.Nodes[nodeID] = node + wf.Status.Nodes.Set(nodeID, node) } // it could be possible for corruption to result in invalid progress, we just ignore invalid progress if !node.Progress.IsValid() { @@ -26,7 +28,7 @@ func UpdateProgress(wf *wfv1.Workflow) { switch node.Phase { case wfv1.NodeSucceeded, wfv1.NodeSkipped, wfv1.NodeOmitted: node.Progress = node.Progress.Complete() - wf.Status.Nodes[nodeID] = node + wf.Status.Nodes.Set(nodeID, node) } // the total should only contain node that are valid wf.Status.Progress = wf.Status.Progress.Add(node.Progress) @@ -41,7 +43,7 @@ func UpdateProgress(wf *wfv1.Workflow) { progress := sumProgress(wf, node, make(map[string]bool)) if progress.IsValid() { node.Progress = progress - wf.Status.Nodes[nodeID] = node + wf.Status.Nodes.Set(nodeID, node) } } // we could check an invariant here, wf.Status.Nodes[wf.Name].Progress == wf.Status.Progress, but I think there's @@ -67,8 +69,12 @@ func sumProgress(wf *wfv1.Workflow, node wfv1.NodeStatus, visited map[string]boo } visited[childNodeID] = true // this will tolerate missing child (will be "") and therefore ignored - child := wf.Status.Nodes[childNodeID] - progress = progress.Add(sumProgress(wf, child, visited)) + child, err := wf.Status.Nodes.Get(childNodeID) + if err != nil { + log.Warnf("Coudn't obtain child for %s, panicking", childNodeID) + continue + } + progress = progress.Add(sumProgress(wf, *child, visited)) if executable(child.Type) { v := child.Progress if v.IsValid() { diff --git a/workflow/sync/sync_manager.go b/workflow/sync/sync_manager.go index 456e10bdf7d0..6f4c92b87aac 100644 --- a/workflow/sync/sync_manager.go +++ b/workflow/sync/sync_manager.go @@ -265,7 +265,7 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool { lock.removeFromQueue(getHolderKey(wf, node.ID)) } node.SynchronizationStatus = nil - wf.Status.Nodes[node.ID] = node + wf.Status.Nodes.Set(node.ID, node) } } diff --git a/workflow/util/util.go b/workflow/util/util.go index 9a477782b85a..f2db02aabcdf 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -413,7 +413,7 @@ func ResumeWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, hydrat } node.Phase = wfv1.NodeSucceeded node.FinishedAt = metav1.Time{Time: time.Now().UTC()} - wf.Status.Nodes[nodeID] = node + wf.Status.Nodes.Set(nodeID, node) workflowUpdated = true } } @@ -559,8 +559,7 @@ func updateSuspendedNode(ctx context.Context, wfIf v1alpha1.WorkflowInterface, h } } } - - wf.Status.Nodes[nodeID] = node + wf.Status.Nodes.Set(nodeID, node) } } } @@ -729,7 +728,7 @@ func FormulateResubmitWorkflow(ctx context.Context, wf *wfv1.Workflow, memoized newNode.Phase = wfv1.NodePending newNode.Message = "" } - newWF.Status.Nodes[newNode.ID] = *newNode + newWF.Status.Nodes.Set(newNode.ID, *newNode) } newWF.Status.StoredTemplates = make(map[string]wfv1.Template) @@ -754,7 +753,12 @@ func getDescendantNodeIDs(wf *wfv1.Workflow, node wfv1.NodeStatus) []string { var descendantNodeIDs []string descendantNodeIDs = append(descendantNodeIDs, node.Children...) for _, child := range node.Children { - descendantNodeIDs = append(descendantNodeIDs, getDescendantNodeIDs(wf, wf.Status.Nodes[child])...) + childStatus, err := wf.Status.Nodes.Get(child) + if err != nil { + log.Fatalf("Couldn't get child, panicking") + panic("Was not able to obtain child") + } + descendantNodeIDs = append(descendantNodeIDs, getDescendantNodeIDs(wf, *childStatus)...) } return descendantNodeIDs } @@ -786,15 +790,23 @@ func isGroupNode(node wfv1.NodeStatus) bool { func resetConnectedParentGroupNodes(oldWF *wfv1.Workflow, newWF *wfv1.Workflow, currentNode wfv1.NodeStatus, resetParentGroupNodes []string) (*wfv1.Workflow, []string) { currentNodeID := currentNode.ID for { - currentNode := oldWF.Status.Nodes[currentNodeID] + currentNode, err := oldWF.Status.Nodes.Get(currentNodeID) + if err != nil { + log.Fatalf("dying due to inability to obtain node for %s", currentNodeID) + panic("was not able to get node, panicking") + } if !containsNode(resetParentGroupNodes, currentNodeID) { - newWF.Status.Nodes[currentNodeID] = resetNode(*currentNode.DeepCopy()) + newWF.Status.Nodes.Set(currentNodeID, resetNode(*currentNode.DeepCopy())) resetParentGroupNodes = append(resetParentGroupNodes, currentNodeID) log.Debugf("Reset connected group node %s", currentNode.Name) } if currentNode.BoundaryID != "" && currentNode.BoundaryID != oldWF.ObjectMeta.Name { - parentNode := oldWF.Status.Nodes[currentNode.BoundaryID] - if isGroupNode(parentNode) { + parentNode, err := oldWF.Status.Nodes.Get(currentNode.BoundaryID) + if err != nil { + log.Fatalf("panicking unable to obtain node for %s", currentNode.BoundaryID) + panic("was not able to get node") + } + if isGroupNode(*parentNode) { currentNodeID = parentNode.ID } else { break @@ -879,7 +891,11 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce var nodeGroupNeedsReset bool // Only reset DAG that's in the same branch as the nodeIDsToReset for _, child := range descendantNodeIDs { - childNode := wf.Status.Nodes[child] + childNode, err := wf.Status.Nodes.Get(child) + if err != nil { + log.Fatalf("was unable to obtain node for %s due to %s", child, err) + return nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", child, err) + } if _, present := nodeIDsToReset[child]; present { log.Debugf("Group node %s needs to reset since its child %s is in the force reset path", node.Name, childNode.Name) nodeGroupNeedsReset = true @@ -905,29 +921,33 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce descendantNodeIDs := getDescendantNodeIDs(wf, node) for _, descendantNodeID := range descendantNodeIDs { deletedNodes[descendantNodeID] = true - descendantNode := wf.Status.Nodes[descendantNodeID] + descendantNode, err := wf.Status.Nodes.Get(descendantNodeID) + if err != nil { + log.Fatalf("Was unable to obtain node for %s due to %s", descendantNodeID, err) + return nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", descendantNodeID, err) + } if descendantNode.Type == wfv1.NodeTypePod { newWF, resetParentGroupNodes = resetConnectedParentGroupNodes(wf, newWF, node, resetParentGroupNodes) - deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, descendantNode, deletedPods, podsToDelete) + deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, *descendantNode, deletedPods, podsToDelete) log.Debugf("Deleted pod node %s since it belongs to node %s", descendantNode.Name, node.Name) } } } else { log.Debugf("Reset non-pod/suspend node %s", node.Name) newNode := node.DeepCopy() - newWF.Status.Nodes[newNode.ID] = resetNode(*newNode) + newWF.Status.Nodes.Set(newNode.ID, resetNode(*newNode)) } } } else { if !containsNode(resetParentGroupNodes, node.ID) { log.Debugf("Node %s remains as is", node.Name) - newWF.Status.Nodes[node.ID] = node + newWF.Status.Nodes.Set(node.ID, node) } } case wfv1.NodeError, wfv1.NodeFailed, wfv1.NodeOmitted: if isGroupNode(node) { newNode := node.DeepCopy() - newWF.Status.Nodes[newNode.ID] = resetNode(*newNode) + newWF.Status.Nodes.Set(newNode.ID, resetNode(*newNode)) log.Debugf("Reset %s node %s since it's a group node", node.Name, string(node.Phase)) continue } else { @@ -945,7 +965,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce if node.Name == wf.ObjectMeta.Name { log.Debugf("Reset root node: %s", node.Name) newNode := node.DeepCopy() - newWF.Status.Nodes[newNode.ID] = resetNode(*newNode) + newWF.Status.Nodes.Set(newNode.ID, resetNode(*newNode)) continue } } @@ -954,7 +974,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce for _, node := range newWF.Status.Nodes { if deletedNodes[node.ID] { log.Debugf("Removed node: %s", node.Name) - delete(newWF.Status.Nodes, node.ID) + newWF.Status.Nodes.Delete(node.ID) continue } @@ -974,7 +994,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce } node.OutboundNodes = outboundNodes - newWF.Status.Nodes[node.ID] = node + newWF.Status.Nodes.Set(node.ID, node) } } diff --git a/workflow/util/util_test.go b/workflow/util/util_test.go index 599bebe5b63f..7d02913d3b7b 100644 --- a/workflow/util/util_test.go +++ b/workflow/util/util_test.go @@ -66,10 +66,11 @@ func TestResubmitWorkflowWithOnExit(t *testing.T) { }, } onExitID := wf.NodeID(onExitName) - wf.Status.Nodes[onExitID] = wfv1.NodeStatus{ + onExitNode := wfv1.NodeStatus{ Name: onExitName, Phase: wfv1.NodeSucceeded, } + wf.Status.Nodes.Set(onExitID, onExitNode) newWF, err := FormulateResubmitWorkflow(context.Background(), &wf, true, nil) assert.NoError(t, err) newWFOnExitName := newWF.ObjectMeta.Name + ".onExit"