Skip to content

Commit

Permalink
fix: inline template loops should receive more than the first item. F…
Browse files Browse the repository at this point in the history
…ixes: #12594 (#12628)

Signed-off-by: shuangkun <tsk2013uestc@163.com>
  • Loading branch information
shuangkun authored Mar 15, 2024
1 parent a678294 commit d5a4f7e
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 0 deletions.
28 changes: 28 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3419,6 +3419,34 @@ func (wf *Workflow) SetStoredTemplate(scope ResourceScope, resourceName string,
return false, nil
}

// SetStoredInlineTemplate stores a inline template in stored templates of the workflow.
func (wf *Workflow) SetStoredInlineTemplate(scope ResourceScope, resourceName string, tmpl *Template) error {
// Store inline templates in steps.
for _, steps := range tmpl.Steps {
for _, step := range steps.Steps {
if step.GetTemplate() != nil {
_, err := wf.SetStoredTemplate(scope, resourceName, &step, step.GetTemplate())
if err != nil {
return err
}
}
}
}
// Store inline templates in DAG tasks.
if tmpl.DAG != nil {
for _, task := range tmpl.DAG.Tasks {
if task.GetTemplate() != nil {
_, err := wf.SetStoredTemplate(scope, resourceName, &task, task.GetTemplate())
if err != nil {
return err
}
}
}
}

return nil
}

// resolveTemplateReference resolves the stored template name of a given template holder on the template scope and determines
// if it should be stored
func resolveTemplateReference(callerScope ResourceScope, resourceName string, caller TemplateReferenceHolder) (string, bool) {
Expand Down
218 changes: 218 additions & 0 deletions workflow/controller/inline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -74,3 +75,220 @@ spec:
assert.Equal(t, "message", node.Inputs.Parameters[0].Name)
assert.Equal(t, "foo", node.Inputs.Parameters[0].Value.String())
}

var workflowCallTemplateWithInline = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: test-call-inline-iterated
namespace: argo
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: process
templateRef:
name: test-inline-iterated
template: main`

var workflowTemplateWithInlineSteps = `
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: test-inline-iterated
namespace: argo
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: iterated
template: steps-inline
arguments:
parameters:
- name: arg
value: "{{ item }}"
withItems:
- foo
- bar
- name: steps-inline
inputs:
parameters:
- name: arg
steps:
- - name: inline-a
arguments:
parameters:
- name: arg
value: "{{ inputs.parameters.arg }}"
inline:
inputs:
parameters:
- name: arg
container:
image: docker/whalesay
command: [echo]
args:
- "{{ inputs.parameters.arg }} a"
outputs:
parameters:
- name: arg-out
value: "{{ inputs.parameters.arg }}"
- name: inline-b
arguments:
parameters:
- name: arg
value: "{{ inputs.parameters.arg }}"
inline:
inputs:
parameters:
- name: arg
container:
image: docker/whalesay
command: [echo]
args:
- "{{ inputs.parameters.arg }} b"
outputs:
parameters:
- name: arg-out
value: "{{ inputs.parameters.arg }}"
`

func TestCallTemplateWithInlineSteps(t *testing.T) {
wftmpl := wfv1.MustUnmarshalWorkflowTemplate(workflowTemplateWithInlineSteps)
wf := wfv1.MustUnmarshalWorkflow(workflowCallTemplateWithInline)
cancel, controller := newController(wf, wftmpl)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
pods, err := listPods(woc)
assert.Nil(t, err)
assert.Equal(t, 4, len(pods.Items))
count := 0
for _, pod := range pods.Items {
nodeName := pod.Annotations["workflows.argoproj.io/node-name"]
if strings.Contains(nodeName, "foo") {
count++
assert.Contains(t, pod.Spec.Containers[1].Args[0], "foo")
}
if strings.Contains(nodeName, "bar") {
assert.Contains(t, pod.Spec.Containers[1].Args[0], "bar")
}
}
assert.Equal(t, 2, count)
for name, storedTemplate := range woc.wf.Status.StoredTemplates {
if strings.Contains(name, "inline-a") {
assert.Equal(t, storedTemplate.Container.Args[0], "{{ inputs.parameters.arg }} a")
}
if strings.Contains(name, "inline-b") {
assert.Equal(t, storedTemplate.Container.Args[0], "{{ inputs.parameters.arg }} b")
}
}
}

var workflowTemplateWithInlineDAG = `
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: test-inline-iterated
namespace: argo
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: iterated
template: dag-inline
arguments:
parameters:
- name: arg
value: "{{ item }}"
withItems:
- foo
- bar
- name: dag-inline
inputs:
parameters:
- name: arg
dag:
tasks:
- name: inline-a
arguments:
parameters:
- name: arg
value: '{{ inputs.parameters.arg }}'
inline:
container:
args:
- '{{ inputs.parameters.arg }} a'
command:
- echo
image: docker/whalesay
inputs:
parameters:
- name: arg
outputs:
parameters:
- name: arg-out
value: '{{ inputs.parameters.arg }}'
- name: inline-b
arguments:
parameters:
- name: arg
value: '{{ inputs.parameters.arg }}'
inline:
container:
args:
- '{{ inputs.parameters.arg }} b'
command:
- echo
image: docker/whalesay
inputs:
parameters:
- name: arg
outputs:
parameters:
- name: arg-out
value: '{{ inputs.parameters.arg }}'
`

func TestCallTemplateWithInlineDAG(t *testing.T) {
wftmpl := wfv1.MustUnmarshalWorkflowTemplate(workflowTemplateWithInlineDAG)
wf := wfv1.MustUnmarshalWorkflow(workflowCallTemplateWithInline)
cancel, controller := newController(wf, wftmpl)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
pods, err := listPods(woc)
assert.Nil(t, err)
assert.Equal(t, 4, len(pods.Items))
count := 0
for _, pod := range pods.Items {
nodeName := pod.Annotations["workflows.argoproj.io/node-name"]
if strings.Contains(nodeName, "foo") {
count++
assert.Contains(t, pod.Spec.Containers[1].Args[0], "foo")
}
if strings.Contains(nodeName, "bar") {
assert.Contains(t, pod.Spec.Containers[1].Args[0], "bar")
}
}
assert.Equal(t, 2, count)
for name, storedTemplate := range woc.wf.Status.StoredTemplates {
if strings.Contains(name, "inline-a") {
assert.Equal(t, storedTemplate.Container.Args[0], "{{ inputs.parameters.arg }} a")
}
if strings.Contains(name, "inline-b") {
assert.Equal(t, storedTemplate.Container.Args[0], "{{ inputs.parameters.arg }} b")
}
}
}
4 changes: 4 additions & 0 deletions workflow/templateresolution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ func (ctx *Context) resolveTemplateImpl(tmplHolder wfv1.TemplateReferenceHolder)
ctx.log.Debug("Stored the template")
templateStored = true
}
err = ctx.workflow.SetStoredInlineTemplate(scope, resourceName, newTmpl)
if err != nil {
ctx.log.Errorf("Failed to store the inline template: %v", err)
}
}
tmpl = newTmpl
}
Expand Down

0 comments on commit d5a4f7e

Please sign in to comment.