Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix missing artifacts for stopped workflows. Fixes #12401 #12402

Merged
merged 16 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
abcd87e
fix: fix missing artifacts for stopped workflows. Fixes #12401
Garett-MacGowan Dec 23, 2023
36d98b4
fix: Bug fix for checkTaskResultsComplete function. Presence of of ou…
Garett-MacGowan Dec 23, 2023
3f5b70d
fix: Fix tests. Fixes #12401
Garett-MacGowan Dec 23, 2023
0f22f3c
Merge branch 'main' into fix/stopped_workflow_artifacts
Garett-MacGowan Dec 23, 2023
785b463
fix: Empty commit for re-running transient test failure.
Garett-MacGowan Dec 23, 2023
b090a88
fix: Simplify logic for checking task result completion status. Fixes…
Garett-MacGowan Dec 28, 2023
a019a3e
fix: Fix lint error. Fixes #12401
Garett-MacGowan Dec 28, 2023
1f66956
fix: Make task result reconciliation check more robust. Fixes #12401
Garett-MacGowan Dec 30, 2023
fb79398
fix: Introduce InitializeOutput(bgCtx) for wait container to improve …
Garett-MacGowan Dec 31, 2023
6d6be52
fix: Address PR comments. Bug fix for legacy/insecure pod patch. Fixe…
Garett-MacGowan Jan 3, 2024
5cdeffe
Merge branch 'main' into fix/stopped_workflow_artifacts
Garett-MacGowan Jan 3, 2024
840a878
fix: Empty commit for CI resubmit. Fixes #12401
Garett-MacGowan Jan 3, 2024
88b84f0
Merge branch 'main' into fix/stopped_workflow_artifacts
Garett-MacGowan Jan 3, 2024
25180a9
Merge branch 'main' into fix/stopped_workflow_artifacts
Garett-MacGowan Jan 3, 2024
4782f40
fix: Remove extraneous apostrophe in TaskResultsCompletionStatus comm…
Garett-MacGowan Jan 3, 2024
05e01d3
fix: empty commit for ci resubmit. Fixes #12401
Garett-MacGowan Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/jsonschema/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func waitContainer(ctx context.Context) error {
defer stats.LogStats()
stats.StartStatsTicker(5 * time.Minute)

// Create a new empty (placeholder) task result with LabelKeyReportOutputsCompleted set to false.
wfExecutor.InitializeOutput(bgCtx)
juliev0 marked this conversation as resolved.
Show resolved Hide resolved

// Wait for main container to complete
err := wfExecutor.Wait(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ WorkflowStatus contains overall status information about a workflow
|`storedTemplates`|[`Template`](#template)|StoredTemplates is a mapping between a template ref and the node's status.|
|`storedWorkflowTemplateSpec`|[`WorkflowSpec`](#workflowspec)|StoredWorkflowSpec stores the WorkflowTemplate spec for future execution.|
|`synchronization`|[`SynchronizationStatus`](#synchronizationstatus)|Synchronization stores the status of synchronization locks|
|`taskResultsCompleted`|`Map< boolean , string >`|Have task results been completed? (mapped by Pod name) used to prevent premature garbage collection of artifacts.|
|`taskResultsCompletionStatus`|`Map< boolean , string >`|TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.|

## CronWorkflowSpec

Expand Down
2 changes: 1 addition & 1 deletion manifests/base/crds/full/argoproj.io_workflows.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1,416 changes: 708 additions & 708 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 19 additions & 17 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1944,29 +1944,31 @@ type WorkflowStatus struct {
// ArtifactGCStatus maintains the status of Artifact Garbage Collection
ArtifactGCStatus *ArtGCStatus `json:"artifactGCStatus,omitempty" protobuf:"bytes,19,opt,name=artifactGCStatus"`

// Have task results been completed? (mapped by Pod name) used to prevent premature garbage collection of artifacts.
TaskResultsCompleted map[string]bool `json:"taskResultsCompleted,omitempty" protobuf:"bytes,20,opt,name=taskResultsCompleted"`
// TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.
TaskResultsCompletionStatus map[string]bool `json:"taskResultsCompletionStatus,omitempty" protobuf:"bytes,20,opt,name=taskResultsCompletionStatus"`
}

func (ws *WorkflowStatus) InitializeTaskResultIncomplete(resultName string) {
if ws.TaskResultsCompleted == nil {
ws.TaskResultsCompleted = make(map[string]bool)
}
if _, ok := ws.TaskResultsCompleted[resultName]; !ok {
ws.MarkTaskResultIncomplete(resultName)
func (ws *WorkflowStatus) MarkTaskResultIncomplete(name string) {
if ws.TaskResultsCompletionStatus == nil {
ws.TaskResultsCompletionStatus = make(map[string]bool)
}
ws.TaskResultsCompletionStatus[name] = false
}

func (ws *WorkflowStatus) MarkTaskResultComplete(name string) {
ws.TaskResultsCompleted[name] = true
}
func (ws *WorkflowStatus) MarkTaskResultIncomplete(name string) {
ws.TaskResultsCompleted[name] = false
}
func (ws *WorkflowStatus) GetTaskResultCompleted(name string) bool {
return ws.TaskResultsCompleted[name]
if ws.TaskResultsCompletionStatus == nil {
ws.TaskResultsCompletionStatus = make(map[string]bool)
}
ws.TaskResultsCompletionStatus[name] = true
}
func (ws *WorkflowStatus) GetTaskResultsCompleted() map[string]bool {
return ws.TaskResultsCompleted

func (ws *WorkflowStatus) TaskResultsInProgress() bool {
for _, value := range ws.TaskResultsCompletionStatus {
if !value {
return true
}
}
return false
}

func (ws *WorkflowStatus) IsOffloadNodeStatus() bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions sdks/python/client/docs/WorkflowServiceApi.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,29 @@ spec:
})
}

func (s *ArgoServerSuite) TestArtifactServerArchivedStoppedWorkflow() {
var uid types.UID
var nodeID string
s.Given().
Workflow(`@testdata/artifact-workflow-stopped.yaml`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeArchived).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
uid = metadata.UID
nodeID = status.Nodes.FindByDisplayName("create-artifact").ID
})

s.Run("GetArtifactByNodeID", func() {
s.e().GET("/artifact-files/argo/archived-workflows/{uid}/{nodeID}/outputs/artifact-creator", uid, nodeID).
Expect().
Status(200).
Body().
Contains("testing")
})
}

// make sure we can download an artifact
func (s *ArgoServerSuite) TestArtifactServer() {
var uid types.UID
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/daemon_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ spec:
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
WaitForWorkflow(fixtures.ToBeCompleted).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.False(t, status.FinishedAt.IsZero())
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ func (s *FunctionalSuite) TestDataTransformation() {
Workflow("@testdata/data-transformation.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(90 * time.Second).
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
Expand Down
99 changes: 99 additions & 0 deletions test/e2e/testdata/artifact-workflow-stopped.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: wf-stopped-
spec:
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflows.argoproj.io/workflow: "wf-stopped"
entrypoint: wf-stopped-main
serviceAccountName: argo
executor:
serviceAccountName: default
volumeClaimTemplates:
- metadata:
name: artifacts
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 1Gi
templates:
- name: wf-stopped-main
dag:
tasks:
- name: create-artifact
template: artifact-creator
- name: delay-stop-workflow
template: delay-stop
- name: stop-workflow
template: workflow-stopper
dependencies: [delay-stop-workflow]
- name: delay-stop
container:
image: alpine:latest
volumeMounts:
- name: artifacts
mountPath: /mnt/vol
command: [sh, -c]
args:
- |
echo "Delaying workflow stop"
ls /mnt
x=0
while [ $x -le 60 ]
do
sleep 1
if [ -f "/mnt/vol/test.txt" ]; then
echo "Artifact found in shared volume"
break
fi
x=$(( $x + 1 ))
done
- name: workflow-stopper
container:
image: argoproj/argocli:latest
args:
- stop
- -l
- workflows.argoproj.io/workflow=wf-stopped
- --namespace=argo
- --loglevel=debug
- name: artifact-creator
metadata:
labels:
template: "artifact-creator"
container:
image: alpine:latest
volumeMounts:
- name: artifacts
mountPath: /mnt/vol
command: [sh, -c]
args:
- |
echo 'testing' > /mnt/vol/test.txt
echo "Artifact saved to /mnt/vol/test.txt"
echo "Pretending to continue to do work."
ls /mnt
while :
do
sleep 1
done
outputs:
artifacts:
- name: artifact-creator
path: /mnt/vol/test.txt
s3:
key: artifact-creator
bucket: my-bucket-3
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
archive:
none: {}
Loading