Skip to content

Commit

Permalink
fix: add guard against NodeStatus. Fixes #11102 (#11451)
Browse files Browse the repository at this point in the history
Signed-off-by: Isitha Subasinghe <isitha@pipekit.io>
Signed-off-by: isubasinghe <isitha@pipekit.io>
  • Loading branch information
isubasinghe authored Aug 1, 2023
1 parent ce9e50c commit 1f6b19f
Show file tree
Hide file tree
Showing 24 changed files with 603 additions and 287 deletions.
68 changes: 62 additions & 6 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"

log "github.com/sirupsen/logrus"

argoerrs "github.com/argoproj/argo-workflows/v3/errors"
"github.com/argoproj/argo-workflows/v3/util/slice"
)
Expand Down Expand Up @@ -1729,6 +1731,64 @@ func (n Nodes) Find(f func(NodeStatus) bool) *NodeStatus {
return nil
}

// Get a NodeStatus from the hashmap of Nodes.
// Return a nil along with an error if non existent.
func (n Nodes) Get(key string) (*NodeStatus, error) {
val, ok := n[key]
if !ok {
return nil, fmt.Errorf("key was not found for %s", key)
}
return &val, nil
}

// Check if the Nodes map has a key entry
func (n Nodes) Has(key string) bool {
_, err := n.Get(key)
return err == nil
}

// Get the Phase of a Node
func (n Nodes) GetPhase(key string) (*NodePhase, error) {
val, err := n.Get(key)
if err != nil {
return nil, err
}
return &val.Phase, nil
}

// Set the status of a node by key
func (n Nodes) Set(key string, status NodeStatus) {
if status.Name == "" {
log.Warnf("Name was not set for key %s", key)
}
if status.ID == "" {
log.Warnf("ID was not set for key %s", key)
}
_, ok := n[key]
if ok {
log.Tracef("Changing NodeStatus for %s to %+v", key, status)
}
n[key] = status
}

// Delete a node from the Nodes by key
func (n Nodes) Delete(key string) {
has := n.Has(key)
if !has {
log.Warnf("Trying to delete non existent key %s", key)
return
}
delete(n, key)
}

// Get the name of a node by key
func (n Nodes) GetName(key string) (string, error) {
val, err := n.Get(key)
if err != nil {
return "", err
}
return val.Name, nil
}
func NodeWithName(name string) func(n NodeStatus) bool {
return func(n NodeStatus) bool { return n.Name == name }
}
Expand Down Expand Up @@ -3251,13 +3311,9 @@ func (wf *Workflow) GetTemplateByName(name string) *Template {
return nil
}

func (wf *Workflow) GetNodeByName(nodeName string) *NodeStatus {
func (wf *Workflow) GetNodeByName(nodeName string) (*NodeStatus, error) {
nodeID := wf.NodeID(nodeName)
node, ok := wf.Status.Nodes[nodeID]
if !ok {
return nil
}
return &node
return wf.Status.Nodes.Get(nodeID)
}

// GetResourceScope returns the template scope of workflow.
Expand Down
19 changes: 15 additions & 4 deletions server/artifacts/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,16 @@ func (a *ArtifactServer) getArtifactAndDriver(ctx context.Context, nodeId, artif
kubeClient := auth.GetKubeClient(ctx)

var art *wfv1.Artifact

nodeStatus, err := wf.Status.Nodes.Get(nodeId)
if err != nil {
log.Errorf("Was unable to retrieve node for %s", nodeId)
return nil, nil, fmt.Errorf("was not able to retrieve node")
}
if isInput {
art = wf.Status.Nodes[nodeId].Inputs.GetArtifactByName(artifactName)
art = nodeStatus.Inputs.GetArtifactByName(artifactName)
} else {
art = wf.Status.Nodes[nodeId].Outputs.GetArtifactByName(artifactName)
art = nodeStatus.Outputs.GetArtifactByName(artifactName)
}
if art == nil {
return nil, nil, fmt.Errorf("artifact not found: %s, isInput=%t, Workflow Status=%+v", artifactName, isInput, wf.Status)
Expand All @@ -395,7 +401,12 @@ func (a *ArtifactServer) getArtifactAndDriver(ctx context.Context, nodeId, artif
// 5. Inline Template

var archiveLocation *wfv1.ArtifactLocation
templateName := util.GetTemplateFromNode(wf.Status.Nodes[nodeId])
templateNode, err := wf.Status.Nodes.Get(nodeId)
if err != nil {
log.Errorf("was unable to retrieve node for %s", nodeId)
return nil, nil, fmt.Errorf("Unable to get artifact and driver due to inability to get node due for %s, err=%s", nodeId, err)
}
templateName := util.GetTemplateFromNode(*templateNode)
if templateName != "" {
template := wf.GetTemplateByName(templateName)
if template == nil {
Expand All @@ -412,7 +423,7 @@ func (a *ArtifactServer) getArtifactAndDriver(ctx context.Context, nodeId, artif
archiveLocation = ar.ToArtifactLocation()
}

err := art.Relocate(archiveLocation) // if the Artifact defines the location (case 1), it will be used; otherwise whatever archiveLocation is set to
err = art.Relocate(archiveLocation) // if the Artifact defines the location (case 1), it will be used; otherwise whatever archiveLocation is set to
if err != nil {
return art, nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ func (t *Then) ExpectArtifact(nodeName string, artifactName string, bucketName s
nodeName = t.wf.Name
}

n := t.wf.GetNodeByName(nodeName)
n, err := t.wf.GetNodeByName(nodeName)
if err != nil {
t.t.Error("was unable to get node by name")
}
a := n.GetOutputs().GetArtifactByName(artifactName)
key, _ := a.GetKey()

Expand Down
12 changes: 9 additions & 3 deletions util/resource/updater.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package resource

import (
log "github.com/sirupsen/logrus"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

Expand All @@ -14,7 +16,7 @@ func UpdateResourceDurations(wf *wfv1.Workflow) {
} else if node.Fulfilled() {
// compute the sum of all children
node.ResourcesDuration = resourceDuration(wf, node, make(map[string]bool))
wf.Status.Nodes[nodeID] = node
wf.Status.Nodes.Set(nodeID, node)
}
}
}
Expand All @@ -27,11 +29,15 @@ func resourceDuration(wf *wfv1.Workflow, node wfv1.NodeStatus, visited map[strin
continue
}
visited[childID] = true
child := wf.Status.Nodes[childID]
child, err := wf.Status.Nodes.Get(childID)
if err != nil {
log.Warnf("was unable to obtain node for %s", childID)
continue
}
if child.Type == wfv1.NodeTypePod {
v = v.Add(child.ResourcesDuration)
}
v = v.Add(resourceDuration(wf, child, visited))
v = v.Add(resourceDuration(wf, *child, visited))
}
return v
}
14 changes: 9 additions & 5 deletions workflow/controller/artifact_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ func (woc *wfOperationCtx) processArtifactGCStrategy(ctx context.Context, strate
groupedByPod[podName] = make(templatesToArtifacts)
}
// get the Template for the Artifact
node, found := woc.wf.Status.Nodes[artifactSearchResult.NodeID]
if !found {
node, err := woc.wf.Status.Nodes.Get(artifactSearchResult.NodeID)
if err != nil {
woc.log.Errorf("Was unable to obtain node for %s", artifactSearchResult.NodeID)
return fmt.Errorf("can't process Artifact GC Strategy %s: node ID %q not found in Status??", strategy, artifactSearchResult.NodeID)
}
templateName := node.TemplateName
Expand Down Expand Up @@ -635,8 +636,9 @@ func (woc *wfOperationCtx) processCompletedWorkflowArtifactGCTask(artifactGCTask
foundGCFailure := false
for nodeName, nodeResult := range artifactGCTask.Status.ArtifactResultsByNode {
// find this node result in the Workflow Status
wfNode, found := woc.wf.Status.Nodes[nodeName]
if !found {
wfNode, err := woc.wf.Status.Nodes.Get(nodeName)
if err != nil {
woc.log.Errorf("Was unable to obtain node for %s", nodeName)
return false, fmt.Errorf("node named %q returned by WorkflowArtifactGCTask %q wasn't found in Workflow %q Status", nodeName, artifactGCTask.Name, woc.wf.Name)
}
if wfNode.Outputs == nil {
Expand All @@ -649,7 +651,9 @@ func (woc *wfOperationCtx) processCompletedWorkflowArtifactGCTask(artifactGCTask
// could be in a different WorkflowArtifactGCTask
continue
}
woc.wf.Status.Nodes[nodeName].Outputs.Artifacts[i].Deleted = artifactResult.Success

wfNode.Outputs.Artifacts[i].Deleted = artifactResult.Success
woc.wf.Status.Nodes.Set(nodeName, *wfNode)

if artifactResult.Error != nil {
woc.addArtGCCondition(fmt.Sprintf("%s (artifactGCTask: %s)", *artifactResult.Error, artifactGCTask.Name))
Expand Down
8 changes: 4 additions & 4 deletions workflow/controller/container_set_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
node := woc.wf.GetNodeByName(nodeName)
if node == nil {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending)
}
includeScriptOutput, err := woc.includeScriptOutput(nodeName, opts.boundaryID)
Expand All @@ -30,8 +30,8 @@ func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName str
// which prevents creating many pending nodes that could never be scheduled
for _, c := range tmpl.ContainerSet.GetContainers() {
ctxNodeName := fmt.Sprintf("%s.%s", nodeName, c.Name)
ctrNode := woc.wf.GetNodeByName(ctxNodeName)
if ctrNode == nil {
_, err := woc.wf.GetNodeByName(ctxNodeName)
if err != nil {
_ = woc.initializeNode(ctxNodeName, wfv1.NodeTypeContainer, templateScope, orgTmpl, node.ID, wfv1.NodePending)
}
}
Expand Down
Loading

0 comments on commit 1f6b19f

Please sign in to comment.