Skip to content

Commit

Permalink
Introduce 'completed' pod label and label selector so controller can …
Browse files Browse the repository at this point in the history
…ignore completed pods
  • Loading branch information
jessesuen committed Nov 29, 2017
1 parent 199dbcb commit 205e5cb
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 24 deletions.
8 changes: 8 additions & 0 deletions api/workflow/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ func (n NodeStatus) Completed() bool {
n.Phase == NodeSkipped
}

// IsDaemoned returns whether or not the node is deamoned
func (n NodeStatus) IsDaemoned() bool {
if n.Daemoned == nil || !*n.Daemoned {
return false
}
return true
}

// Successful returns whether or not this node completed successfully
func (n NodeStatus) Successful() bool {
return n.Phase == NodeSucceeded || n.Phase == NodeSkipped
Expand Down
4 changes: 2 additions & 2 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ const (
// AnnotationKeyOutputs is the pod metadata annotation key containing the container outputs
AnnotationKeyOutputs = wfv1.CRDFullName + "/outputs"

// LabelKeyArgoWorkflow is the pod metadata label to indidcate this pod is part of a workflow
LabelKeyArgoWorkflow = wfv1.CRDFullName + "/argo-workflow"
// LabelKeyCompleted is the pod metadata label on workflow pods which indicates if a pod is completed
LabelKeyCompleted = wfv1.CRDFullName + "/completed"
// LabelKeyWorkflow is the pod metadata label to indidcate the associated workflow name
LabelKeyWorkflow = wfv1.CRDFullName + "/workflow"

Expand Down
69 changes: 49 additions & 20 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch {
req := c.Get().
Namespace(namespace).
Resource(resource).
Param("labelSelector", fmt.Sprintf("%s=true", common.LabelKeyArgoWorkflow)).
Param("labelSelector", fmt.Sprintf("%s=false", common.LabelKeyCompleted)).
VersionedParams(&options, metav1.ParameterCodec)
req = wfc.addLabelSelectors(req)
return req.Do().Get()
Expand All @@ -235,7 +235,7 @@ func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch {
req := c.Get().
Namespace(namespace).
Resource(resource).
Param("labelSelector", fmt.Sprintf("%s=true", common.LabelKeyArgoWorkflow)).
Param("labelSelector", fmt.Sprintf("%s=false", common.LabelKeyCompleted)).
VersionedParams(&options, metav1.ParameterCodec)
req = wfc.addLabelSelectors(req)
return req.Watch()
Expand Down Expand Up @@ -282,6 +282,7 @@ func (wfc *WorkflowController) watchWorkflowPods(ctx context.Context) (cache.Con
}

// handlePodUpdate receives an update from a pod, and updates the status of the node in the workflow object accordingly
// It is also responsible for unsetting the deamoned flag from a node status when it notices that a daemoned pod terminated.
func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
workflowName, ok := pod.Labels[common.LabelKeyWorkflow]
if !ok {
Expand Down Expand Up @@ -352,20 +353,42 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
updateNeeded := applyUpdates(pod, &node, newPhase, newDaemonStatus, message)
if !updateNeeded {
log.Infof("No workflow updated needed for node %s", node)
return
}
wf.Status.Nodes[pod.Name] = node
_, err = wfClient.UpdateWorkflow(wf)
if err != nil {
log.Errorf("Failed to update %s status: %+v", pod.Name, err)
// if we fail to update the CRD state, we will need to rely on resync to catch up
return
} else {
wf.Status.Nodes[pod.Name] = node
_, err = wfClient.UpdateWorkflow(wf)
if err != nil {
log.Errorf("Failed to update %s status: %+v", pod.Name, err)
// if we fail to update the CRD state, we will need to rely on resync to catch up
return
}
log.Infof("Updated %s", node)
}
log.Infof("Updated %s", node)

// TODO: if we successfully updated the workflow, and the pod is completed, then we
// have extracted everything we need from the pod metadata and can ignore this pod.
// Apply a label to the pod which will filter it from the watch.
if node.Completed() {
// If we get here, we need to decide whether or not to set the 'completed=true' label on the pod,
// which prevents the controller from seeing any pod updates for the rest of its existance.
// We only add the label if the pod is *not* daemoned, because we still rely on this pod watch
// for daemoned pods, in order to properly remove the daemoned status from the node when the pod
// terminates.
if !node.IsDaemoned() {
podIf := wfc.clientset.CoreV1().Pods(pod.ObjectMeta.Namespace)
// TODO: use patch instead of get/update
pod, err = podIf.Get(pod.ObjectMeta.Name, metav1.GetOptions{})
if err != nil {
log.Errorf("Failed to get pod %s for labeleing: %+v", node, err)
return
}
pod.Labels[common.LabelKeyCompleted] = "true"
_, err = podIf.Update(pod)
if err != nil {
log.Errorf("Failed to label completed pod %s: %+v", node, err)
return
}
log.Infof("Set completed=true label to pod: %s", node)
} else {
log.Infof("Skipping completed labeling for daemoned pod: %s", node)
}
}
}

// applyUpdates applies any new state information about a pod, to the current status of the workflow node
Expand All @@ -374,9 +397,15 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase
// Check various fields of the pods to see if we need to update the workflow
updateNeeded := false
if node.Phase != newPhase {
log.Infof("Updating node %s status %s -> %s", node, node.Phase, newPhase)
updateNeeded = true
node.Phase = newPhase
if node.Completed() {
// Don't modify the phase if this node was already considered completed.
// This might happen with daemoned steps which fail after they were daemoned
log.Infof("Ignoring node %s status update %s -> %s", node, node.Phase, newPhase)
} else {
log.Infof("Updating node %s status %s -> %s", node, node.Phase, newPhase)
updateNeeded = true
node.Phase = newPhase
}
}
if pod.Status.PodIP != node.PodIP {
log.Infof("Updating node %s IP %s -> %s", node, node.PodIP, pod.Status.PodIP)
Expand All @@ -389,7 +418,7 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase
// (as opposed to setting it to false)
newDaemonStatus = nil
}
if newDaemonStatus != nil && node.Daemoned == nil || newDaemonStatus == nil && node.Daemoned != nil {
if (newDaemonStatus != nil && node.Daemoned == nil) || (newDaemonStatus == nil && node.Daemoned != nil) {
log.Infof("Setting node %v daemoned: %v -> %v", node, node.Daemoned, newDaemonStatus)
node.Daemoned = newDaemonStatus
updateNeeded = true
Expand All @@ -413,8 +442,8 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase
node.Message = message
}
if node.Completed() && node.FinishedAt.IsZero() {
// TODO: rather than using time.Now(), we should use the pod termination timestamp
// to get a more accurate finished timestamp, in the event the controller is
// TODO: rather than using time.Now(), we should use the latest container finishedAt
// timestamp to get a more accurate finished timestamp, in the event the controller is
// down or backlogged. But this would not work for daemoned containers.
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
updateNeeded = true
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Templat
ObjectMeta: metav1.ObjectMeta{
Name: woc.wf.NodeID(nodeName),
Labels: map[string]string{
common.LabelKeyWorkflow: woc.wf.ObjectMeta.Name, // Allow filtering by pods related to specific workflow
common.LabelKeyArgoWorkflow: "true", // Allow filtering by only argo workflow related pods
common.LabelKeyWorkflow: woc.wf.ObjectMeta.Name, // Allows filtering by pods related to specific workflow
common.LabelKeyCompleted: "false", // Allows filtering by incomplete workflow pods
},
Annotations: map[string]string{
common.AnnotationKeyNodeName: nodeName,
Expand Down

0 comments on commit 205e5cb

Please sign in to comment.