Skip to content

Commit

Permalink
fix: Add new WorkflowTaskResult label to prevent race condition in Ar…
Browse files Browse the repository at this point in the history
…tifactGC. Fixes argoproj#11879

Signed-off-by: Garett MacGowan <garettsoftware@gmail.com>
  • Loading branch information
Garett-MacGowan committed Oct 16, 2023
1 parent 50cadb6 commit 0b5e9af
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 25 deletions.
7 changes: 6 additions & 1 deletion cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func waitContainer(ctx context.Context) error {
wfExecutor.AddError(err)
}

wfExecutor.SaveLogs(ctx)
// Save log artifacts
logArtifacts := wfExecutor.SaveLogs(ctx)

// Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations
wfExecutor.ReportOutputs(ctx, logArtifacts)

return wfExecutor.HasError()
}
2 changes: 1 addition & 1 deletion tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spec:
ports: 9000
dependencies: install
- name: controller
command: ./dist/workflow-controller
command: ./dist/workflow-controller --loglevel debug
dependencies: install build-controller port-forward
env:
- ARGO_EXECUTOR_PLUGINS=false
Expand Down
2 changes: 2 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ const (
LabelKeyOnExit = workflow.WorkflowFullName + "/on-exit"
// LabelKeyArtifactGCPodHash is a label applied to WorkflowTaskSets used by the Artifact Garbage Collection Pod
LabelKeyArtifactGCPodHash = workflow.WorkflowFullName + "/artifact-gc-pod"
// LabelKeyWritingArtifact is a label applied to WorkflowTaskResults while artifacts are being written.
LabelKeyWritingArtifact = workflow.WorkflowFullName + "/writing-artifact"

// ExecutorArtifactBaseDir is the base directory in the init container in which artifacts will be copied to.
// Each artifact will be named according to its input name (e.g: /argo/inputs/artifacts/CODE)
Expand Down
8 changes: 7 additions & 1 deletion workflow/controller/artifact_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const artifactGCComponent = "artifact-gc"
// artifactGCEnabled is a feature flag to globally disabled artifact GC in case of emergency
var artifactGCEnabled, _ = env.GetBool("ARGO_ARTIFACT_GC_ENABLED", true)

func (woc *wfOperationCtx) garbageCollectArtifacts(ctx context.Context) error {
func (woc *wfOperationCtx) garbageCollectArtifacts(ctx context.Context, artifactsWriting bool) error {

if !artifactGCEnabled {
return nil
Expand Down Expand Up @@ -64,6 +64,12 @@ func (woc *wfOperationCtx) garbageCollectArtifacts(ctx context.Context) error {
}
}

if artifactsWriting {
// If we don't do this, a race condition can occur where only some artifacts are garbage collected and finalizers are prematurely removed.
woc.log.Debug("Skipping garbage collection completion due to in-progress artifact reporting.")
return nil
}

err := woc.processArtifactGCCompletion(ctx)
if err != nil {
return err
Expand Down
9 changes: 4 additions & 5 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.computeMetrics(woc.execWf.Spec.Metrics.Prometheus, localScope, realTimeScope, true)
}

// Reconciliation of Outputs (Artifacts). See reportOutputs() of executor.go
woc.taskResultReconciliation()

// check to see if we can do garbage collection of Artifacts;
if err := woc.garbageCollectArtifacts(ctx); err != nil {
// Reconciliation of Outputs (Artifacts). See reportOutputs() of executor.go.
artifactsWriting := woc.taskResultReconciliation()
// Check to see if we can do garbage collection of artifacts.
if err := woc.garbageCollectArtifacts(ctx, artifactsWriting); err != nil {
woc.log.WithError(err).Error("failed to GC artifacts")
return
}
Expand Down
10 changes: 9 additions & 1 deletion workflow/controller/taskresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,18 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex
return informer
}

func (woc *wfOperationCtx) taskResultReconciliation() {
func (woc *wfOperationCtx) taskResultReconciliation() bool {
artifactsWriting := false

objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name)
woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation")
for _, obj := range objs {
result := obj.(*wfv1.WorkflowTaskResult)

if result.Labels[common.LabelKeyWritingArtifact] == "true" {
artifactsWriting = true
}

nodeID := result.Name
old, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
Expand All @@ -83,4 +90,5 @@ func (woc *wfOperationCtx) taskResultReconciliation() {
woc.updated = true
}
}
return artifactsWriting
}
25 changes: 16 additions & 9 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) error {
return argoerrs.InternalWrapError(err)
}

// Create an empty task result with a label to indicate that artifacts are being written. Prevents garbage collection race condition.
err = we.reportResult(ctx, wfv1.NodeResult{}, map[string]string{common.LabelKeyWritingArtifact: "true"})
if err != nil {
return err
}

for i, art := range we.Template.Outputs.Artifacts {
err := we.saveArtifact(ctx, common.MainContainerName, &art)
if err != nil {
Expand Down Expand Up @@ -584,7 +590,7 @@ func (we *WorkflowExecutor) SaveParameters(ctx context.Context) error {
return nil
}

func (we *WorkflowExecutor) SaveLogs(ctx context.Context) {
func (we *WorkflowExecutor) SaveLogs(ctx context.Context) []wfv1.Artifact {
var logArtifacts []wfv1.Artifact
tempLogsDir := "/tmp/argo/outputs/logs"

Expand All @@ -608,7 +614,10 @@ func (we *WorkflowExecutor) SaveLogs(ctx context.Context) {
}
}

// try to upsert TaskResult, if it fails, we will try to update the Pod's Annotations
return logArtifacts
}

func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, logArtifacts []wfv1.Artifact) {
err := we.reportOutputs(ctx, logArtifacts)
if err != nil {
we.AddError(err)
Expand Down Expand Up @@ -784,21 +793,19 @@ func (we *WorkflowExecutor) CaptureScriptResult(ctx context.Context) error {
func (we *WorkflowExecutor) reportOutputs(ctx context.Context, logArtifacts []wfv1.Artifact) error {
outputs := we.Template.Outputs.DeepCopy()
outputs.Artifacts = append(outputs.Artifacts, logArtifacts...)
return we.reportResult(ctx, wfv1.NodeResult{Outputs: outputs})
// Task result label to indicate that artifacts are done being written.
return we.reportResult(ctx, wfv1.NodeResult{Outputs: outputs}, map[string]string{common.LabelKeyWritingArtifact: "false"})
}

func (we *WorkflowExecutor) reportResult(ctx context.Context, result wfv1.NodeResult) error {
if !result.Outputs.HasOutputs() && !result.Progress.IsValid() {
return nil
}
func (we *WorkflowExecutor) reportResult(ctx context.Context, result wfv1.NodeResult, labels map[string]string) error {
return retryutil.OnError(wait.Backoff{
Duration: time.Second,
Factor: 2,
Jitter: 0.1,
Steps: 5,
Cap: 30 * time.Second,
}, errorsutil.IsTransientErr, func() error {
err := we.upsertTaskResult(ctx, result)
err := we.upsertTaskResult(ctx, result, labels)
if apierr.IsForbidden(err) {
log.WithError(err).Warn("failed to patch task set, falling back to legacy/insecure pod patch, see https://argoproj.github.io/argo-workflows/workflow-rbac/")
if result.Outputs.HasOutputs() {
Expand Down Expand Up @@ -1107,7 +1114,7 @@ func (we *WorkflowExecutor) monitorProgress(ctx context.Context, progressFile st
log.WithError(ctx.Err()).Info("stopping progress monitor (context done)")
return
case <-annotationPatchTicker.C:
if err := we.reportResult(ctx, wfv1.NodeResult{Progress: we.progress}); err != nil {
if err := we.reportResult(ctx, wfv1.NodeResult{Progress: we.progress}, map[string]string{}); err != nil {
log.WithError(err).Info("failed to report progress")
} else {
we.progress = ""
Expand Down
23 changes: 16 additions & 7 deletions workflow/executor/taskresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"os"

"golang.org/x/exp/maps"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -14,16 +15,21 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

func (we *WorkflowExecutor) upsertTaskResult(ctx context.Context, result wfv1.NodeResult) error {
err := we.createTaskResult(ctx, result)
func (we *WorkflowExecutor) upsertTaskResult(ctx context.Context, result wfv1.NodeResult, labels map[string]string) error {
err := we.createTaskResult(ctx, result, labels)
if apierr.IsAlreadyExists(err) {
return we.patchTaskResult(ctx, result)
return we.patchTaskResult(ctx, result, labels)
}
return err
}

func (we *WorkflowExecutor) patchTaskResult(ctx context.Context, result wfv1.NodeResult) error {
data, err := json.Marshal(&wfv1.WorkflowTaskResult{NodeResult: result})
func (we *WorkflowExecutor) patchTaskResult(ctx context.Context, result wfv1.NodeResult, labels map[string]string) error {
data, err := json.Marshal(&wfv1.WorkflowTaskResult{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
NodeResult: result,
})
if err != nil {
return err
}
Expand All @@ -36,15 +42,18 @@ func (we *WorkflowExecutor) patchTaskResult(ctx context.Context, result wfv1.Nod
return err
}

func (we *WorkflowExecutor) createTaskResult(ctx context.Context, result wfv1.NodeResult) error {
func (we *WorkflowExecutor) createTaskResult(ctx context.Context, result wfv1.NodeResult, labels map[string]string) error {
defaultLabels := map[string]string{common.LabelKeyWorkflow: we.workflow}
maps.Copy(labels, defaultLabels)

taskResult := &wfv1.WorkflowTaskResult{
TypeMeta: metav1.TypeMeta{
APIVersion: workflow.APIVersion,
Kind: workflow.WorkflowTaskResultKind,
},
ObjectMeta: metav1.ObjectMeta{
Name: we.nodeId,
Labels: map[string]string{common.LabelKeyWorkflow: we.workflow},
Labels: labels,
},
NodeResult: result,
}
Expand Down

0 comments on commit 0b5e9af

Please sign in to comment.