Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Emit WorkflowNodeRunning Event #5531

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/workflow-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ Workflow state change:
* `WorkflowFailed`
* `WorkflowTimedOut`

Node completion
Node state change:

* `WorkflowNodeRunning`
* `WorkflowNodeSucceeded`
* `WorkflowNodeFailed`
* `WorkflowNodeError`


The involved object is the workflow in both cases. Additionally, for node completion events, annotations indicate the name and type of the involved node:
The involved object is the workflow in both cases. Additionally, for node state change events, annotations indicate the name and type of the involved node:

```yaml
metadata:
Expand All @@ -42,4 +43,4 @@ involvedObject:
firstTimestamp: "2020-04-09T16:50:16Z"
lastTimestamp: "2020-04-09T16:50:16Z"
count: 1
```
```
9 changes: 7 additions & 2 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,12 @@ func (s *FunctionalSuite) TestEventOnNodeFail() {
}).
ExpectAuditEvents(
fixtures.HasInvolvedObject(workflow.WorkflowKind, uid),
2,
4,
func(t *testing.T, es []corev1.Event) {
for _, e := range es {
switch e.Reason {
case "WorkflowNodeRunning":
assert.Contains(t, e.Message, "Running node failed-step-event-")
case "WorkflowRunning":
case "WorkflowNodeFailed":
assert.Contains(t, e.Message, "Failed node failed-step-event-")
Expand Down Expand Up @@ -233,10 +235,13 @@ func (s *FunctionalSuite) TestEventOnWorkflowSuccess() {
}).
ExpectAuditEvents(
fixtures.HasInvolvedObject(workflow.WorkflowKind, uid),
3,
4,
func(t *testing.T, es []corev1.Event) {
for _, e := range es {
println(e.Reason, e.Message)
switch e.Reason {
case "WorkflowNodeRunning":
assert.Contains(t, e.Message, "Running node success-event-")
case "WorkflowRunning":
case "WorkflowNodeSucceeded":
assert.Contains(t, e.Message, "Succeeded node success-event-")
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
wfArchive: sqldb.NullWorkflowArchive,
hydrator: hydratorfake.Noop,
estimatorFactory: estimation.DummyEstimatorFactory,
eventRecorderManager: &testEventRecorderManager{eventRecorder: record.NewFakeRecorder(16)},
eventRecorderManager: &testEventRecorderManager{eventRecorder: record.NewFakeRecorder(64)},
archiveLabelSelector: labels.Everything(),
cacheFactory: controllercache.NewCacheFactory(kube, "default"),
}
Expand Down
56 changes: 44 additions & 12 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,13 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
woc.controller.hydrator.HydrateWithNodes(woc.wf, nodes)
}

// The workflow returned from wfClient.Update doesn't have a TypeMeta associated
// with it, so copy from the original workflow.
woc.wf.TypeMeta = woc.orig.TypeMeta
kennytrytek marked this conversation as resolved.
Show resolved Hide resolved

// Create WorkflowNode* events for nodes that have changed phase
woc.recordNodePhaseChangeEvents(woc.orig.Status.Nodes, woc.wf.Status.Nodes)

if !woc.controller.hydrator.IsHydrated(woc.wf) {
panic("workflow should be hydrated")
}
Expand Down Expand Up @@ -906,9 +913,6 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
if woc.shouldPrintPodSpec(node) {
printPodSpecLog(pod, woc.wf.Name)
}
if !woc.orig.Status.Nodes[node.ID].Fulfilled() {
woc.onNodeComplete(&node)
}
}
if node.Succeeded() && match {
woc.completedPods[pod.Name] = pod.Status.Phase
Expand Down Expand Up @@ -1983,7 +1987,7 @@ func (woc *wfOperationCtx) initializeNodeOrMarkError(node *wfv1.NodeStatus, node
return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, templateScope, orgTmpl, boundaryID, wfv1.NodeError, err.Error())
}

// Creates a node status that is or will be chaced
// Creates a node status that is or will be cached
func (woc *wfOperationCtx) initializeCacheNode(nodeName string, resolvedTmpl *wfv1.Template, templateScope string, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, memStat *wfv1.MemoizationStatus, messages ...string) *wfv1.NodeStatus {
if resolvedTmpl.Memoize == nil {
err := fmt.Errorf("cannot initialize a cached node from a non-memoized template")
Expand Down Expand Up @@ -2077,23 +2081,18 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase,
woc.log.Infof("node %s finished: %s", node.ID, node.FinishedAt)
woc.updated = true
}
if !woc.orig.Status.Nodes[node.ID].Fulfilled() && node.Fulfilled() {
woc.onNodeComplete(node)
}
woc.wf.Status.Nodes[node.ID] = *node
return node
}

func (woc *wfOperationCtx) onNodeComplete(node *wfv1.NodeStatus) {
if !woc.controller.Config.NodeEvents.IsEnabled() {
return
}
func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
kennytrytek marked this conversation as resolved.
Show resolved Hide resolved
message := fmt.Sprintf("%v node %s", node.Phase, node.Name)
if node.Message != "" {
message = message + ": " + node.Message
}
eventType := apiv1.EventTypeWarning
if node.Phase == wfv1.NodeSucceeded {
switch node.Phase {
case wfv1.NodeSucceeded, wfv1.NodeRunning:
eventType = apiv1.EventTypeNormal
}
woc.eventRecorder.AnnotatedEventf(
Expand All @@ -2108,6 +2107,39 @@ func (woc *wfOperationCtx) onNodeComplete(node *wfv1.NodeStatus) {
)
}

// recordNodePhaseChangeEvents creates WorkflowNode Kubernetes events for each node
// that has changes logged during this execution of the operator loop.
func (woc *wfOperationCtx) recordNodePhaseChangeEvents(old wfv1.Nodes, new wfv1.Nodes) {
if !woc.controller.Config.NodeEvents.IsEnabled() {
return
}

// Check for newly added nodes; send an event for new nodes
for nodeName, newNode := range new {
oldNode, exists := old[nodeName]
if exists {
if oldNode.Phase == newNode.Phase {
continue
}
if oldNode.Phase == wfv1.NodePending && newNode.Completed() {
ephemeralNode := newNode.DeepCopy()
ephemeralNode.Phase = wfv1.NodeRunning
woc.recordNodePhaseEvent(ephemeralNode)
}
woc.recordNodePhaseEvent(&newNode)
} else {
if newNode.Phase == wfv1.NodeRunning {
woc.recordNodePhaseEvent(&newNode)
} else if newNode.Completed() {
ephemeralNode := newNode.DeepCopy()
kennytrytek marked this conversation as resolved.
Show resolved Hide resolved
ephemeralNode.Phase = wfv1.NodeRunning
woc.recordNodePhaseEvent(ephemeralNode)
woc.recordNodePhaseEvent(&newNode)
}
}
}
}

// markNodeError is a convenience method to mark a node with an error and set the message from the error
func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeStatus {
woc.log.WithError(err).WithField("nodeName", nodeName).Error("Mark error node")
Expand Down
25 changes: 24 additions & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3165,6 +3165,8 @@ spec:
image: docker/whalesay:latest
`: {
"Normal WorkflowRunning Workflow Running",
"Normal WorkflowNodeRunning Running node dag-events",
"Normal WorkflowNodeRunning Running node dag-events.a",
"Normal WorkflowNodeSucceeded Succeeded node dag-events.a",
"Normal WorkflowNodeSucceeded Succeeded node dag-events",
"Normal WorkflowSucceeded Workflow completed",
Expand All @@ -3185,11 +3187,32 @@ spec:
image: docker/whalesay:latest
`: {
"Normal WorkflowRunning Workflow Running",
"Normal WorkflowNodeRunning Running node steps-events",
"Normal WorkflowNodeRunning Running node steps-events[0]",
"Normal WorkflowNodeRunning Running node steps-events[0].a",
"Normal WorkflowNodeSucceeded Succeeded node steps-events[0].a",
"Normal WorkflowNodeSucceeded Succeeded node steps-events[0]",
"Normal WorkflowNodeSucceeded Succeeded node steps-events",
"Normal WorkflowSucceeded Workflow completed",
},
// no DAG or steps
`
metadata:
name: no-dag-or-steps
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`: {
"Normal WorkflowRunning Workflow Running",
"Normal WorkflowNodeRunning Running node no-dag-or-steps",
"Normal WorkflowNodeSucceeded Succeeded node no-dag-or-steps",
"Normal WorkflowSucceeded Workflow completed",
},
} {
wf := unmarshalWF(manifest)
ctx := context.Background()
Expand All @@ -3201,7 +3224,7 @@ spec:
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, want, getEvents(controller, len(want)))
assert.ElementsMatch(t, want, getEvents(controller, len(want)))
})
}
}
Expand Down