From 0b5e9afbbb64a460c523458e01946ad91e1dab92 Mon Sep 17 00:00:00 2001 From: Garett MacGowan Date: Mon, 16 Oct 2023 15:50:38 -0400 Subject: [PATCH] fix: Add new WorkflowTaskResult label to prevent race condition in ArtifactGC. Fixes #11879 Signed-off-by: Garett MacGowan --- cmd/argoexec/commands/wait.go | 7 ++++++- tasks.yaml | 2 +- workflow/common/common.go | 2 ++ workflow/controller/artifact_gc.go | 8 +++++++- workflow/controller/operator.go | 9 ++++----- workflow/controller/taskresult.go | 10 +++++++++- workflow/executor/executor.go | 25 ++++++++++++++++--------- workflow/executor/taskresult.go | 23 ++++++++++++++++------- 8 files changed, 61 insertions(+), 25 deletions(-) diff --git a/cmd/argoexec/commands/wait.go b/cmd/argoexec/commands/wait.go index f0c883a56e42..725414ffe068 100644 --- a/cmd/argoexec/commands/wait.go +++ b/cmd/argoexec/commands/wait.go @@ -53,6 +53,11 @@ func waitContainer(ctx context.Context) error { wfExecutor.AddError(err) } - wfExecutor.SaveLogs(ctx) + // Save log artifacts + logArtifacts := wfExecutor.SaveLogs(ctx) + + // Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations + wfExecutor.ReportOutputs(ctx, logArtifacts) + return wfExecutor.HasError() } diff --git a/tasks.yaml b/tasks.yaml index 5df2a78c74a8..8020d455af49 100644 --- a/tasks.yaml +++ b/tasks.yaml @@ -36,7 +36,7 @@ spec: ports: 9000 dependencies: install - name: controller - command: ./dist/workflow-controller + command: ./dist/workflow-controller --loglevel debug dependencies: install build-controller port-forward env: - ARGO_EXECUTOR_PLUGINS=false diff --git a/workflow/common/common.go b/workflow/common/common.go index 0f6a1afb1542..a458e81a4f1d 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -94,6 +94,8 @@ const ( LabelKeyOnExit = workflow.WorkflowFullName + "/on-exit" // LabelKeyArtifactGCPodHash is a label applied to WorkflowTaskSets used by the Artifact Garbage Collection Pod LabelKeyArtifactGCPodHash = workflow.WorkflowFullName + "/artifact-gc-pod" + // LabelKeyWritingArtifact is a label applied to WorkflowTaskResults while artifacts are being written. + LabelKeyWritingArtifact = workflow.WorkflowFullName + "/writing-artifact" // ExecutorArtifactBaseDir is the base directory in the init container in which artifacts will be copied to. // Each artifact will be named according to its input name (e.g: /argo/inputs/artifacts/CODE) diff --git a/workflow/controller/artifact_gc.go b/workflow/controller/artifact_gc.go index 348150ed19ef..f267e4a21230 100644 --- a/workflow/controller/artifact_gc.go +++ b/workflow/controller/artifact_gc.go @@ -27,7 +27,7 @@ const artifactGCComponent = "artifact-gc" // artifactGCEnabled is a feature flag to globally disabled artifact GC in case of emergency var artifactGCEnabled, _ = env.GetBool("ARGO_ARTIFACT_GC_ENABLED", true) -func (woc *wfOperationCtx) garbageCollectArtifacts(ctx context.Context) error { +func (woc *wfOperationCtx) garbageCollectArtifacts(ctx context.Context, artifactsWriting bool) error { if !artifactGCEnabled { return nil @@ -64,6 +64,12 @@ func (woc *wfOperationCtx) garbageCollectArtifacts(ctx context.Context) error { } } + if artifactsWriting { + // If we don't do this, a race condition can occur where only some artifacts are garbage collected and finalizers are prematurely removed. + woc.log.Debug("Skipping garbage collection completion due to in-progress artifact reporting.") + return nil + } + err := woc.processArtifactGCCompletion(ctx) if err != nil { return err diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 031c20275942..8ad9751b33b7 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -266,11 +266,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { woc.computeMetrics(woc.execWf.Spec.Metrics.Prometheus, localScope, realTimeScope, true) } - // Reconciliation of Outputs (Artifacts). See reportOutputs() of executor.go - woc.taskResultReconciliation() - - // check to see if we can do garbage collection of Artifacts; - if err := woc.garbageCollectArtifacts(ctx); err != nil { + // Reconciliation of Outputs (Artifacts). See reportOutputs() of executor.go. + artifactsWriting := woc.taskResultReconciliation() + // Check to see if we can do garbage collection of artifacts. + if err := woc.garbageCollectArtifacts(ctx, artifactsWriting); err != nil { woc.log.WithError(err).Error("failed to GC artifacts") return } diff --git a/workflow/controller/taskresult.go b/workflow/controller/taskresult.go index ac53e009b525..28f0e769ea7e 100644 --- a/workflow/controller/taskresult.go +++ b/workflow/controller/taskresult.go @@ -52,11 +52,18 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex return informer } -func (woc *wfOperationCtx) taskResultReconciliation() { +func (woc *wfOperationCtx) taskResultReconciliation() bool { + artifactsWriting := false + objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name) woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation") for _, obj := range objs { result := obj.(*wfv1.WorkflowTaskResult) + + if result.Labels[common.LabelKeyWritingArtifact] == "true" { + artifactsWriting = true + } + nodeID := result.Name old, err := woc.wf.Status.Nodes.Get(nodeID) if err != nil { @@ -83,4 +90,5 @@ func (woc *wfOperationCtx) taskResultReconciliation() { woc.updated = true } } + return artifactsWriting } diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 4e15938ceda8..664dabc9cd61 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -293,6 +293,12 @@ func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) error { return argoerrs.InternalWrapError(err) } + // Create an empty task result with a label to indicate that artifacts are being written. Prevents garbage collection race condition. + err = we.reportResult(ctx, wfv1.NodeResult{}, map[string]string{common.LabelKeyWritingArtifact: "true"}) + if err != nil { + return err + } + for i, art := range we.Template.Outputs.Artifacts { err := we.saveArtifact(ctx, common.MainContainerName, &art) if err != nil { @@ -584,7 +590,7 @@ func (we *WorkflowExecutor) SaveParameters(ctx context.Context) error { return nil } -func (we *WorkflowExecutor) SaveLogs(ctx context.Context) { +func (we *WorkflowExecutor) SaveLogs(ctx context.Context) []wfv1.Artifact { var logArtifacts []wfv1.Artifact tempLogsDir := "/tmp/argo/outputs/logs" @@ -608,7 +614,10 @@ func (we *WorkflowExecutor) SaveLogs(ctx context.Context) { } } - // try to upsert TaskResult, if it fails, we will try to update the Pod's Annotations + return logArtifacts +} + +func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, logArtifacts []wfv1.Artifact) { err := we.reportOutputs(ctx, logArtifacts) if err != nil { we.AddError(err) @@ -784,13 +793,11 @@ func (we *WorkflowExecutor) CaptureScriptResult(ctx context.Context) error { func (we *WorkflowExecutor) reportOutputs(ctx context.Context, logArtifacts []wfv1.Artifact) error { outputs := we.Template.Outputs.DeepCopy() outputs.Artifacts = append(outputs.Artifacts, logArtifacts...) - return we.reportResult(ctx, wfv1.NodeResult{Outputs: outputs}) + // Task result label to indicate that artifacts are done being written. + return we.reportResult(ctx, wfv1.NodeResult{Outputs: outputs}, map[string]string{common.LabelKeyWritingArtifact: "false"}) } -func (we *WorkflowExecutor) reportResult(ctx context.Context, result wfv1.NodeResult) error { - if !result.Outputs.HasOutputs() && !result.Progress.IsValid() { - return nil - } +func (we *WorkflowExecutor) reportResult(ctx context.Context, result wfv1.NodeResult, labels map[string]string) error { return retryutil.OnError(wait.Backoff{ Duration: time.Second, Factor: 2, @@ -798,7 +805,7 @@ func (we *WorkflowExecutor) reportResult(ctx context.Context, result wfv1.NodeRe Steps: 5, Cap: 30 * time.Second, }, errorsutil.IsTransientErr, func() error { - err := we.upsertTaskResult(ctx, result) + err := we.upsertTaskResult(ctx, result, labels) if apierr.IsForbidden(err) { log.WithError(err).Warn("failed to patch task set, falling back to legacy/insecure pod patch, see https://argoproj.github.io/argo-workflows/workflow-rbac/") if result.Outputs.HasOutputs() { @@ -1107,7 +1114,7 @@ func (we *WorkflowExecutor) monitorProgress(ctx context.Context, progressFile st log.WithError(ctx.Err()).Info("stopping progress monitor (context done)") return case <-annotationPatchTicker.C: - if err := we.reportResult(ctx, wfv1.NodeResult{Progress: we.progress}); err != nil { + if err := we.reportResult(ctx, wfv1.NodeResult{Progress: we.progress}, map[string]string{}); err != nil { log.WithError(err).Info("failed to report progress") } else { we.progress = "" diff --git a/workflow/executor/taskresult.go b/workflow/executor/taskresult.go index e071dfa12dc5..9c1671b3bd9e 100644 --- a/workflow/executor/taskresult.go +++ b/workflow/executor/taskresult.go @@ -5,6 +5,7 @@ import ( "encoding/json" "os" + "golang.org/x/exp/maps" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -14,16 +15,21 @@ import ( "github.com/argoproj/argo-workflows/v3/workflow/common" ) -func (we *WorkflowExecutor) upsertTaskResult(ctx context.Context, result wfv1.NodeResult) error { - err := we.createTaskResult(ctx, result) +func (we *WorkflowExecutor) upsertTaskResult(ctx context.Context, result wfv1.NodeResult, labels map[string]string) error { + err := we.createTaskResult(ctx, result, labels) if apierr.IsAlreadyExists(err) { - return we.patchTaskResult(ctx, result) + return we.patchTaskResult(ctx, result, labels) } return err } -func (we *WorkflowExecutor) patchTaskResult(ctx context.Context, result wfv1.NodeResult) error { - data, err := json.Marshal(&wfv1.WorkflowTaskResult{NodeResult: result}) +func (we *WorkflowExecutor) patchTaskResult(ctx context.Context, result wfv1.NodeResult, labels map[string]string) error { + data, err := json.Marshal(&wfv1.WorkflowTaskResult{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + NodeResult: result, + }) if err != nil { return err } @@ -36,7 +42,10 @@ func (we *WorkflowExecutor) patchTaskResult(ctx context.Context, result wfv1.Nod return err } -func (we *WorkflowExecutor) createTaskResult(ctx context.Context, result wfv1.NodeResult) error { +func (we *WorkflowExecutor) createTaskResult(ctx context.Context, result wfv1.NodeResult, labels map[string]string) error { + defaultLabels := map[string]string{common.LabelKeyWorkflow: we.workflow} + maps.Copy(labels, defaultLabels) + taskResult := &wfv1.WorkflowTaskResult{ TypeMeta: metav1.TypeMeta{ APIVersion: workflow.APIVersion, @@ -44,7 +53,7 @@ func (we *WorkflowExecutor) createTaskResult(ctx context.Context, result wfv1.No }, ObjectMeta: metav1.ObjectMeta{ Name: we.nodeId, - Labels: map[string]string{common.LabelKeyWorkflow: we.workflow}, + Labels: labels, }, NodeResult: result, }