From 205e5cbce20a6e5e73c977f1e775671a19bf4434 Mon Sep 17 00:00:00 2001 From: Jesse Suen Date: Wed, 29 Nov 2017 13:05:24 -0800 Subject: [PATCH] Introduce 'completed' pod label and label selector so controller can ignore completed pods --- api/workflow/v1/types.go | 8 ++++ workflow/common/common.go | 4 +- workflow/controller/controller.go | 69 +++++++++++++++++++++--------- workflow/controller/workflowpod.go | 4 +- 4 files changed, 61 insertions(+), 24 deletions(-) diff --git a/api/workflow/v1/types.go b/api/workflow/v1/types.go index d7d86d8f9b73..06f257904179 100644 --- a/api/workflow/v1/types.go +++ b/api/workflow/v1/types.go @@ -230,6 +230,14 @@ func (n NodeStatus) Completed() bool { n.Phase == NodeSkipped } +// IsDaemoned returns whether or not the node is deamoned +func (n NodeStatus) IsDaemoned() bool { + if n.Daemoned == nil || !*n.Daemoned { + return false + } + return true +} + // Successful returns whether or not this node completed successfully func (n NodeStatus) Successful() bool { return n.Phase == NodeSucceeded || n.Phase == NodeSkipped diff --git a/workflow/common/common.go b/workflow/common/common.go index d2dc566e8ac2..d286079ad14b 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -48,8 +48,8 @@ const ( // AnnotationKeyOutputs is the pod metadata annotation key containing the container outputs AnnotationKeyOutputs = wfv1.CRDFullName + "/outputs" - // LabelKeyArgoWorkflow is the pod metadata label to indidcate this pod is part of a workflow - LabelKeyArgoWorkflow = wfv1.CRDFullName + "/argo-workflow" + // LabelKeyCompleted is the pod metadata label on workflow pods which indicates if a pod is completed + LabelKeyCompleted = wfv1.CRDFullName + "/completed" // LabelKeyWorkflow is the pod metadata label to indidcate the associated workflow name LabelKeyWorkflow = wfv1.CRDFullName + "/workflow" diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index aebee3fb75f7..3266277e97e4 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -224,7 +224,7 @@ func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch { req := c.Get(). Namespace(namespace). Resource(resource). - Param("labelSelector", fmt.Sprintf("%s=true", common.LabelKeyArgoWorkflow)). + Param("labelSelector", fmt.Sprintf("%s=false", common.LabelKeyCompleted)). VersionedParams(&options, metav1.ParameterCodec) req = wfc.addLabelSelectors(req) return req.Do().Get() @@ -235,7 +235,7 @@ func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch { req := c.Get(). Namespace(namespace). Resource(resource). - Param("labelSelector", fmt.Sprintf("%s=true", common.LabelKeyArgoWorkflow)). + Param("labelSelector", fmt.Sprintf("%s=false", common.LabelKeyCompleted)). VersionedParams(&options, metav1.ParameterCodec) req = wfc.addLabelSelectors(req) return req.Watch() @@ -282,6 +282,7 @@ func (wfc *WorkflowController) watchWorkflowPods(ctx context.Context) (cache.Con } // handlePodUpdate receives an update from a pod, and updates the status of the node in the workflow object accordingly +// It is also responsible for unsetting the deamoned flag from a node status when it notices that a daemoned pod terminated. func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) { workflowName, ok := pod.Labels[common.LabelKeyWorkflow] if !ok { @@ -352,20 +353,42 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) { updateNeeded := applyUpdates(pod, &node, newPhase, newDaemonStatus, message) if !updateNeeded { log.Infof("No workflow updated needed for node %s", node) - return - } - wf.Status.Nodes[pod.Name] = node - _, err = wfClient.UpdateWorkflow(wf) - if err != nil { - log.Errorf("Failed to update %s status: %+v", pod.Name, err) - // if we fail to update the CRD state, we will need to rely on resync to catch up - return + } else { + wf.Status.Nodes[pod.Name] = node + _, err = wfClient.UpdateWorkflow(wf) + if err != nil { + log.Errorf("Failed to update %s status: %+v", pod.Name, err) + // if we fail to update the CRD state, we will need to rely on resync to catch up + return + } + log.Infof("Updated %s", node) } - log.Infof("Updated %s", node) - // TODO: if we successfully updated the workflow, and the pod is completed, then we - // have extracted everything we need from the pod metadata and can ignore this pod. - // Apply a label to the pod which will filter it from the watch. + if node.Completed() { + // If we get here, we need to decide whether or not to set the 'completed=true' label on the pod, + // which prevents the controller from seeing any pod updates for the rest of its existance. + // We only add the label if the pod is *not* daemoned, because we still rely on this pod watch + // for daemoned pods, in order to properly remove the daemoned status from the node when the pod + // terminates. + if !node.IsDaemoned() { + podIf := wfc.clientset.CoreV1().Pods(pod.ObjectMeta.Namespace) + // TODO: use patch instead of get/update + pod, err = podIf.Get(pod.ObjectMeta.Name, metav1.GetOptions{}) + if err != nil { + log.Errorf("Failed to get pod %s for labeleing: %+v", node, err) + return + } + pod.Labels[common.LabelKeyCompleted] = "true" + _, err = podIf.Update(pod) + if err != nil { + log.Errorf("Failed to label completed pod %s: %+v", node, err) + return + } + log.Infof("Set completed=true label to pod: %s", node) + } else { + log.Infof("Skipping completed labeling for daemoned pod: %s", node) + } + } } // applyUpdates applies any new state information about a pod, to the current status of the workflow node @@ -374,9 +397,15 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase // Check various fields of the pods to see if we need to update the workflow updateNeeded := false if node.Phase != newPhase { - log.Infof("Updating node %s status %s -> %s", node, node.Phase, newPhase) - updateNeeded = true - node.Phase = newPhase + if node.Completed() { + // Don't modify the phase if this node was already considered completed. + // This might happen with daemoned steps which fail after they were daemoned + log.Infof("Ignoring node %s status update %s -> %s", node, node.Phase, newPhase) + } else { + log.Infof("Updating node %s status %s -> %s", node, node.Phase, newPhase) + updateNeeded = true + node.Phase = newPhase + } } if pod.Status.PodIP != node.PodIP { log.Infof("Updating node %s IP %s -> %s", node, node.PodIP, pod.Status.PodIP) @@ -389,7 +418,7 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase // (as opposed to setting it to false) newDaemonStatus = nil } - if newDaemonStatus != nil && node.Daemoned == nil || newDaemonStatus == nil && node.Daemoned != nil { + if (newDaemonStatus != nil && node.Daemoned == nil) || (newDaemonStatus == nil && node.Daemoned != nil) { log.Infof("Setting node %v daemoned: %v -> %v", node, node.Daemoned, newDaemonStatus) node.Daemoned = newDaemonStatus updateNeeded = true @@ -413,8 +442,8 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase node.Message = message } if node.Completed() && node.FinishedAt.IsZero() { - // TODO: rather than using time.Now(), we should use the pod termination timestamp - // to get a more accurate finished timestamp, in the event the controller is + // TODO: rather than using time.Now(), we should use the latest container finishedAt + // timestamp to get a more accurate finished timestamp, in the event the controller is // down or backlogged. But this would not work for daemoned containers. node.FinishedAt = metav1.Time{Time: time.Now().UTC()} updateNeeded = true diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index d1a06bb5dbad..4dbfc370855d 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -132,8 +132,8 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Templat ObjectMeta: metav1.ObjectMeta{ Name: woc.wf.NodeID(nodeName), Labels: map[string]string{ - common.LabelKeyWorkflow: woc.wf.ObjectMeta.Name, // Allow filtering by pods related to specific workflow - common.LabelKeyArgoWorkflow: "true", // Allow filtering by only argo workflow related pods + common.LabelKeyWorkflow: woc.wf.ObjectMeta.Name, // Allows filtering by pods related to specific workflow + common.LabelKeyCompleted: "false", // Allows filtering by incomplete workflow pods }, Annotations: map[string]string{ common.AnnotationKeyNodeName: nodeName,