Skip to content

Commit

Permalink
Issue #1136 - Fix metadata for DAG with loops (#1149)
Browse files Browse the repository at this point in the history
* Issue #1136 - Fix metadata for DAG with loops
  • Loading branch information
alexmt authored Jan 3, 2019
1 parent c7fec9d commit 3561bff
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 32 deletions.
57 changes: 43 additions & 14 deletions test/e2e/ui/ui-dag-with-params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,53 @@ kind: Workflow
metadata:
generateName: ui-dag-with-params-
spec:
entrypoint: diamond
entrypoint: pipeline

templates:
- name: diamond
dag:
tasks:
- name: A
template: nested-diamond
arguments:
parameters: [{name: message, value: A}]
- name: nested-diamond
- name: echo
inputs:
parameters:
- name: message
container:
image: alpine:latest
command: [echo, "{{inputs.parameters.message}}"]

- name: subpipeline-a
dag:
tasks:
- name: A
- name: A1
template: echo
- name: echo
container:
image: alpine:3.7
command: [echo, "hello"]
arguments:
parameters: [{name: message, value: "Hello World!"}]
- name: A2
template: echo
arguments:
parameters: [{name: message, value: "Hello World!"}]

- name: subpipeline-b
dag:
tasks:
- name: B1
template: echo
arguments:
parameters: [{name: message, value: "Hello World!"}]
- name: B2
template: echo
dependencies: [B1]
arguments:
parameters: [{name: message, value: "Hello World!"}]
withItems:
- 0
- 1

- name: pipeline
dag:
tasks:
- name: A
template: subpipeline-a
withItems:
- 0
- 1
- name: B
dependencies: [A]
template: subpipeline-b
42 changes: 25 additions & 17 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,21 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {

// All our dependencies were satisfied and successful. It's our turn to run

taskGroupNode := woc.getNodeByName(nodeName)
if taskGroupNode != nil && taskGroupNode.Type != wfv1.NodeTypeTaskGroup {
taskGroupNode = nil
}
// connectDependencies is a helper to connect our dependencies to current task as children
connectDependencies := func(taskNodeName string) {
if len(task.Dependencies) == 0 {
if len(task.Dependencies) == 0 || taskGroupNode != nil {
// if we had no dependencies, then we are a root task, and we should connect the
// boundary node as our parent
woc.addChildNode(dagCtx.boundaryName, taskNodeName)
if taskGroupNode == nil {
woc.addChildNode(dagCtx.boundaryName, taskNodeName)
} else {
woc.addChildNode(taskGroupNode.Name, taskNodeName)
}

} else {
// Otherwise, add all outbound nodes of our dependencies as parents to this node
for _, depName := range task.Dependencies {
Expand Down Expand Up @@ -287,6 +296,16 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
return
}

// If DAG task has withParam of with withSequence then we need to create virtual node of type TaskGroup.
// For example, if we had task A with withItems of ['foo', 'bar'] which expanded to ['A(0:foo)', 'A(1:bar)'], we still
// need to create a node for A.
if len(task.WithItems) > 0 || task.WithParam != "" || task.WithSequence != nil {
if taskGroupNode == nil {
connectDependencies(nodeName)
taskGroupNode = woc.initializeNode(nodeName, wfv1.NodeTypeTaskGroup, task.Template, dagCtx.boundaryID, wfv1.NodeRunning, "")
}
}

for _, t := range expandedTasks {
node = dagCtx.getTaskNode(t.Name)
taskNodeName := dagCtx.taskNodeName(t.Name)
Expand All @@ -311,30 +330,19 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
_, _ = woc.executeTemplate(t.Template, t.Arguments, taskNodeName, dagCtx.boundaryID)
}

// If we expanded the task, we still need to create the task entry for the non-expanded node,
// since dependant tasks will look to it, when deciding when to execute. For example, if we had
// task A with withItems of ['foo', 'bar'] which expanded to ['A(0:foo)', 'A(1:bar)'], we still
// need to create a node for A, after the withItems have completed.
if len(task.WithItems) > 0 || task.WithParam != "" || task.WithSequence != nil {
nodeStatus := wfv1.NodeSucceeded
if taskGroupNode != nil {
groupPhase := wfv1.NodeSucceeded
for _, t := range expandedTasks {
// Add the child relationship from our dependency's outbound nodes to this node.
node := dagCtx.getTaskNode(t.Name)
if node == nil || !node.Completed() {
return
}
if !node.Successful() {
nodeStatus = node.Phase
}
}
woc.initializeNode(nodeName, wfv1.NodeTypeTaskGroup, task.Template, dagCtx.boundaryID, nodeStatus, "")
if len(expandedTasks) > 0 {
for _, t := range expandedTasks {
woc.addChildNode(dagCtx.taskNodeName(t.Name), nodeName)
groupPhase = node.Phase
}
} else {
connectDependencies(nodeName)
}
woc.markNodePhase(taskGroupNode.Name, groupPhase)
}
}

Expand Down
11 changes: 10 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,8 +1180,17 @@ func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template
func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string {
node := woc.wf.Status.Nodes[nodeID]
switch node.Type {
case wfv1.NodeTypePod, wfv1.NodeTypeSkipped, wfv1.NodeTypeSuspend, wfv1.NodeTypeTaskGroup:
case wfv1.NodeTypePod, wfv1.NodeTypeSkipped, wfv1.NodeTypeSuspend:
return []string{node.ID}
case wfv1.NodeTypeTaskGroup:
if len(node.Children) == 0 {
return []string{node.ID}
}
outboundNodes := make([]string, 0)
for _, child := range node.Children {
outboundNodes = append(outboundNodes, woc.getOutboundNodes(child)...)
}
return outboundNodes
case wfv1.NodeTypeRetry:
numChildren := len(node.Children)
if numChildren > 0 {
Expand Down

0 comments on commit 3561bff

Please sign in to comment.