Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't necessarily include all artifacts from templates in node outputs #13066

Merged
merged 15 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,17 @@ func waitContainer(ctx context.Context) error {
}

// Saving output artifacts
err = wfExecutor.SaveArtifacts(bgCtx)
artifacts, err := wfExecutor.SaveArtifacts(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Save log artifacts
logArtifacts := wfExecutor.SaveLogs(bgCtx)
artifacts = append(artifacts, logArtifacts...)

// Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations
err = wfExecutor.ReportOutputs(bgCtx, logArtifacts)
err = wfExecutor.ReportOutputs(bgCtx, artifacts)
if err != nil {
wfExecutor.AddError(err)
}
Expand Down
17 changes: 9 additions & 8 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,26 +291,27 @@
}

// SaveArtifacts uploads artifacts to the archive location
func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) error {
func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) (wfv1.Artifacts, error) {
artifacts := wfv1.Artifacts{}
if len(we.Template.Outputs.Artifacts) == 0 {
log.Infof("No output artifacts")
return nil
return artifacts, nil
}

log.Infof("Saving output artifacts")
err := os.MkdirAll(tempOutArtDir, os.ModePerm)
if err != nil {
return argoerrs.InternalWrapError(err)
return artifacts, argoerrs.InternalWrapError(err)
}

for i, art := range we.Template.Outputs.Artifacts {

Check failure on line 307 in workflow/executor/executor.go

View workflow job for this annotation

GitHub Actions / Unit Tests

i declared and not used
err := we.saveArtifact(ctx, common.MainContainerName, &art)
if err != nil {
return err
return artifacts, err
}
we.Template.Outputs.Artifacts[i] = art
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of updating this directly, create a new list of the artifacts that we actually saved and return that

Copy link
Contributor

@Garett-MacGowan Garett-MacGowan Jun 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. So before we were using the we.Template.Outputs.Artifacts to pass to ReportOutputs but now we are creating a fresh wfv1.Artifacts object that will only include the successfully saved artifacts. This makes sense.

Copy link
Member

@agilgur5 agilgur5 Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I do see a small issue here, but I think this already existed beforehand:
If the wait container is interrupted during this loop, some artifacts may save to S3 (etc) but not be reported

But since adding artifacts to Outputs.Artifacts wouldn't report them until the very end anyway, I think this doesn't change any existing behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the wait container is interrupted during this loop, some artifacts may save to S3 (etc) but not be reported

How do you mean it wouldn't be reported? Since the background context is passed into both SaveArtifacts() and ReportOutputs() below, I believe all of that logic is still supposed to be executed. Is there something different you're referring to besides the context being cancelled?

Copy link
Member

@agilgur5 agilgur5 Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the loop is long or the container is otherwise non-gracefully terminated, the request context won't matter.

Also realized that saving artifacts can be parallelized here to reduce that chance and improve throughput (related: #12442). Similarly pre-existing logic though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is basically if this Pod receives a SIGKILL? I don't think there's anything that can be done in that case, is there? (at least from the Workflow Pod side)

Copy link
Member

@agilgur5 agilgur5 Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More or less yes, but there are things we can do to mitigate that or detect that scenario, such as parallel uploads and writing pieces of the report as each artifact completes uploading (although k8s does not have streaming requests afaik, so that would put more strain on the api server when there are many artifacts).

I also recently remembered we had #12413 merged for 3.6 that does help prevent that too.

(To be clear, this is pre-existing logic / an incidental finding, so not going to block review on that, but there are perhaps actions we should take around this case. Perhaps better discussed in #12993 though; although that's a specific case)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

artifacts = append(artifacts, art)
}
return nil
return artifacts, nil
}

func (we *WorkflowExecutor) saveArtifact(ctx context.Context, containerName string, art *wfv1.Artifact) error {
Expand Down Expand Up @@ -832,9 +833,9 @@
}

// ReportOutputs updates the WorkflowTaskResult (or falls back to annotate the Pod)
func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, logArtifacts []wfv1.Artifact) error {
func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, artifacts []wfv1.Artifact) error {
outputs := we.Template.Outputs.DeepCopy()
outputs.Artifacts = append(outputs.Artifacts, logArtifacts...)
outputs.Artifacts = artifacts
return we.reportResult(ctx, wfv1.NodeResult{Outputs: outputs})
}

Expand Down
Loading