Skip to content

Commit

Permalink
fix: Fixed parent level memoization broken. Fixes #11612 (#11623)
Browse files Browse the repository at this point in the history
Signed-off-by: shmruin <meme_hm@naver.com>
  • Loading branch information
shmruin authored Aug 23, 2023
1 parent 9317360 commit 9693c02
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 20 deletions.
59 changes: 39 additions & 20 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1812,30 +1812,15 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, err), err
}

// Check if this is a fulfilled node for synchronization.
// If so, release synchronization and return this node. No more logic will be executed.
if node != nil {
if node.Fulfilled() {
fulfilledNode := woc.handleNodeFulfilled(nodeName, node, processedTmpl)
if fulfilledNode != nil {
woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)

woc.log.Debugf("Node %s already completed", nodeName)
if processedTmpl.Metrics != nil {
// Check if this node completed between executions. If it did, emit metrics. If a node completes within
// the same execution, its metrics are emitted below.
// We can infer that this node completed during the current operation, emit metrics
if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() {
localScope, realTimeScope := woc.prepareMetricScope(node)
woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}
}
return node, nil
return fulfilledNode, nil
}
woc.log.Debugf("Executing node %s of %s is %s", nodeName, node.Type, node.Phase)
// Memoized nodes don't have StartedAt.
if node.StartedAt.IsZero() {
node.StartedAt = metav1.Time{Time: time.Now().UTC()}
node.EstimatedDuration = woc.estimateNodeDuration(node.Name)
woc.wf.Status.Nodes.Set(node.ID, *node)
woc.updated = true
}
}

// Check if we took too long operating on this workflow and immediately return if we did
Expand Down Expand Up @@ -1952,6 +1937,22 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
}
}

// Check if this is a fulfilled node for memoization.
// If so, just return this node. No more logic will be executed.
if node != nil {
fulfilledNode := woc.handleNodeFulfilled(nodeName, node, processedTmpl)
if fulfilledNode != nil {
return fulfilledNode, nil
}
// Memoized nodes don't have StartedAt.
if node.StartedAt.IsZero() {
node.StartedAt = metav1.Time{Time: time.Now().UTC()}
node.EstimatedDuration = woc.estimateNodeDuration(node.Name)
woc.wf.Status.Nodes.Set(node.ID, *node)
woc.updated = true
}
}

// If the user has specified retries, node becomes a special retry node.
// This node acts as a parent of all retries that will be done for
// the container. The status of this node should be "Success" if any
Expand Down Expand Up @@ -2143,6 +2144,24 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
return node, nil
}

func (woc *wfOperationCtx) handleNodeFulfilled(nodeName string, node *wfv1.NodeStatus, processedTmpl *wfv1.Template) *wfv1.NodeStatus {
if node == nil || !node.Fulfilled() {
return nil
}

woc.log.Debugf("Node %s already completed", nodeName)

if processedTmpl.Metrics != nil {
// Check if this node completed between executions. If it did, emit metrics.
// We can infer that this node completed during the current operation, emit metrics
if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() {
localScope, realTimeScope := woc.prepareMetricScope(node)
woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}
}
return node
}

// Checks if the template has exceeded its deadline
func (woc *wfOperationCtx) checkTemplateTimeout(tmpl *wfv1.Template, node *wfv1.NodeStatus) (*time.Time, error) {
if node == nil {
Expand Down
229 changes: 229 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9288,3 +9288,232 @@ spec:
assert.Equal(t, woc.wf.Status.Phase, wfv1.WorkflowFailed)
assert.Contains(t, woc.wf.Status.Message, "invalid spec")
}

var workflowWithTemplateLevelMemoizationAndChildStep = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
namespace: default
generateName: memoized-entrypoint-
spec:
entrypoint: entrypoint
templates:
- name: entrypoint
memoize:
key: "entrypoint-key-1"
cache:
configMap:
name: cache-top-entrypoint
outputs:
parameters:
- name: url
valueFrom:
expression: |
'https://argo-workflows.company.com/workflows/namepace/' + '{{workflow.name}}' + '?tab=workflow'
steps:
- - name: whalesay
template: whalesay
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay hello_world $(date) > /tmp/hello_world.txt"]
outputs:
parameters:
- name: hello
valueFrom:
path: /tmp/hello_world.txt
`

func TestMemoizationTemplateLevelCacheWithStepWithoutCache(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildStep)

cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()

woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)

// Expect both workflowTemplate and the step to be executed
for _, node := range woc.wf.Status.Nodes {
if node.TemplateName == "entrypoint" {
assert.True(t, true, "Entrypoint node does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.False(t, node.MemoizationStatus.Hit)
}
if node.Name == "whalesay" {
assert.True(t, true, "Whalesay step does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
}
}
}

func TestMemoizationTemplateLevelCacheWithStepWithCache(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildStep)

// Assume cache is already set
sampleConfigMapCacheEntry := apiv1.ConfigMap{
Data: map[string]string{
"entrypoint-key-1": `{"ExpiresAt":"2020-06-18T17:11:05Z","NodeID":"memoize-abx4124-123129321123","Outputs":{}}`,
},
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "cache-top-entrypoint",
ResourceVersion: "1630732",
Labels: map[string]string{
common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache,
},
},
}

cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()

_, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{})
assert.NoError(t, err)

woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)

// Only parent node should exist and it should be a memoization cache hit
for _, node := range woc.wf.Status.Nodes {
t.Log(node)
if node.TemplateName == "entrypoint" {
assert.True(t, true, "Entrypoint node does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.True(t, node.MemoizationStatus.Hit)
}
if node.Name == "whalesay" {
assert.False(t, true, "Whalesay step should not have been executed")
}
}
}

var workflowWithTemplateLevelMemoizationAndChildDag = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
namespace: default
generateName: memoized-entrypoint-
spec:
entrypoint: entrypoint
templates:
- name: entrypoint
dag:
tasks:
- name: whalesay-task
template: whalesay
memoize:
key: "entrypoint-key-1"
cache:
configMap:
name: cache-top-entrypoint
outputs:
parameters:
- name: url
valueFrom:
expression: |
'https://argo-workflows.company.com/workflows/namepace/' + '{{workflow.name}}' + '?tab=workflow'
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay hello_world $(date) > /tmp/hello_world.txt"]
outputs:
parameters:
- name: hello
valueFrom:
path: /tmp/hello_world.txt
`

func TestMemoizationTemplateLevelCacheWithDagWithoutCache(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildDag)

cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()

woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)

// Expect both workflowTemplate and the dag to be executed
for _, node := range woc.wf.Status.Nodes {
if node.TemplateName == "entrypoint" {
assert.True(t, true, "Entrypoint node does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.False(t, node.MemoizationStatus.Hit)
}
if node.Name == "whalesay" {
assert.True(t, true, "Whalesay dag does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
}
}
}

func TestMemoizationTemplateLevelCacheWithDagWithCache(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(workflowWithTemplateLevelMemoizationAndChildDag)

// Assume cache is already set
sampleConfigMapCacheEntry := apiv1.ConfigMap{
Data: map[string]string{
"entrypoint-key-1": `{"ExpiresAt":"2020-06-18T17:11:05Z","NodeID":"memoize-abx4124-123129321123","Outputs":{}}`,
},
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "cache-top-entrypoint",
ResourceVersion: "1630732",
Labels: map[string]string{
common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache,
},
},
}

cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()

_, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{})
assert.NoError(t, err)

woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)

// Only parent node should exist and it should be a memoization cache hit
for _, node := range woc.wf.Status.Nodes {
t.Log(node)
if node.TemplateName == "entrypoint" {
assert.True(t, true, "Entrypoint node does not exist")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.True(t, node.MemoizationStatus.Hit)
}
if node.Name == "whalesay" {
assert.False(t, true, "Whalesay dag should not have been executed")
}
}
}

0 comments on commit 9693c02

Please sign in to comment.