Skip to content

Commit

Permalink
Revert "fix: ArtifactGC Fails for Stopped Workflows. Fixes argoproj#1…
Browse files Browse the repository at this point in the history
…1879 (argoproj#11947)"

This reverts commit c296cf2.
  • Loading branch information
z63d committed Mar 11, 2024
1 parent 2f6cf2c commit 3594f93
Show file tree
Hide file tree
Showing 40 changed files with 777 additions and 1,558 deletions.
7 changes: 0 additions & 7 deletions api/jsonschema/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func initExecutor() *executor.WorkflowExecutor {
podName,
types.UID(os.Getenv(common.EnvVarPodUID)),
os.Getenv(common.EnvVarWorkflowName),
types.UID(os.Getenv(common.EnvVarWorkflowUID)),
os.Getenv(common.EnvVarNodeID),
namespace,
cre,
Expand Down
26 changes: 6 additions & 20 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ func NewWaitCommand() *cobra.Command {

func waitContainer(ctx context.Context) error {
wfExecutor := initExecutor()

// Don't allow cancellation to impact capture of results, parameters, artifacts, or defers.
bgCtx := context.Background()

defer wfExecutor.HandleError(bgCtx) // Must be placed at the bottom of defers stack.
defer wfExecutor.FinalizeOutput(bgCtx) // Ensures the LabelKeyReportOutputsCompleted is set to true.
defer wfExecutor.HandleError(ctx) // Must be placed at the bottom of defers stack.
defer stats.LogStats()
stats.StartStatsTicker(5 * time.Minute)

Expand All @@ -40,33 +35,24 @@ func waitContainer(ctx context.Context) error {
if err != nil {
wfExecutor.AddError(err)
}

ctx = context.Background() // don't allow cancellation to impact capture of results, parameters,or artifacts
// Capture output script result
err = wfExecutor.CaptureScriptResult(bgCtx)
err = wfExecutor.CaptureScriptResult(ctx)
if err != nil {
wfExecutor.AddError(err)
}

// Saving output parameters
err = wfExecutor.SaveParameters(bgCtx)
err = wfExecutor.SaveParameters(ctx)
if err != nil {
wfExecutor.AddError(err)
}

// Saving output artifacts
err = wfExecutor.SaveArtifacts(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Save log artifacts
logArtifacts := wfExecutor.SaveLogs(bgCtx)

// Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations
err = wfExecutor.ReportOutputs(bgCtx, logArtifacts)
err = wfExecutor.SaveArtifacts(ctx)
if err != nil {
wfExecutor.AddError(err)
}

wfExecutor.SaveLogs(ctx)
return wfExecutor.HasError()
}
1 change: 0 additions & 1 deletion docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,6 @@ WorkflowStatus contains overall status information about a workflow
|`storedTemplates`|[`Template`](#template)|StoredTemplates is a mapping between a template ref and the node's status.|
|`storedWorkflowTemplateSpec`|[`WorkflowSpec`](#workflowspec)|StoredWorkflowSpec stores the WorkflowTemplate spec for future execution.|
|`synchronization`|[`SynchronizationStatus`](#synchronizationstatus)|Synchronization stores the status of synchronization locks|
|`taskResultsCompleted`|`Map< boolean , string >`|Have task results been completed? (mapped by Pod name) used to prevent premature garbage collection of artifacts.|

## CronWorkflowSpec

Expand Down
4 changes: 0 additions & 4 deletions manifests/base/crds/full/argoproj.io_workflows.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1,527 changes: 680 additions & 847 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 0 additions & 16 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 0 additions & 24 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1939,30 +1939,6 @@ type WorkflowStatus struct {

// ArtifactGCStatus maintains the status of Artifact Garbage Collection
ArtifactGCStatus *ArtGCStatus `json:"artifactGCStatus,omitempty" protobuf:"bytes,19,opt,name=artifactGCStatus"`

// Have task results been completed? (mapped by Pod name) used to prevent premature garbage collection of artifacts.
TaskResultsCompleted map[string]bool `json:"taskResultsCompleted,omitempty" protobuf:"bytes,20,opt,name=taskResultsCompleted"`
}

func (ws *WorkflowStatus) InitializeTaskResultIncomplete(resultName string) {
if ws.TaskResultsCompleted == nil {
ws.TaskResultsCompleted = make(map[string]bool)
}
if _, ok := ws.TaskResultsCompleted[resultName]; !ok {
ws.MarkTaskResultIncomplete(resultName)
}
}
func (ws *WorkflowStatus) MarkTaskResultComplete(name string) {
ws.TaskResultsCompleted[name] = true
}
func (ws *WorkflowStatus) MarkTaskResultIncomplete(name string) {
ws.TaskResultsCompleted[name] = false
}
func (ws *WorkflowStatus) GetTaskResultCompleted(name string) bool {
return ws.TaskResultsCompleted[name]
}
func (ws *WorkflowStatus) GetTaskResultsCompleted() map[string]bool {
return ws.TaskResultsCompleted
}

func (ws *WorkflowStatus) IsOffloadNodeStatus() bool {
Expand Down
7 changes: 0 additions & 7 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions sdks/python/client/docs/WorkflowServiceApi.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion test/e2e/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ spec:
`).
When().
SubmitWorkflow().
WaitForWorkflow(2 * time.Minute).
WaitForWorkflow(time.Minute + 5*time.Second).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
Expand Down
95 changes: 2 additions & 93 deletions test/e2e/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/common"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
Expand Down Expand Up @@ -67,96 +66,6 @@ type artifactState struct {
deletedAtWFDeletion bool
}

func (s *ArtifactsSuite) TestStoppedWorkflow() {

for _, tt := range []struct {
workflowFile string
}{
{workflowFile: "@testdata/artifactgc/artgc-dag-wf-stopped.yaml"},
{workflowFile: "@testdata/artifactgc/artgc-dag-wf-stopped-pod-gc-on-pod-completion.yaml"},
} {
// Create the minio client for interacting with the bucket.
c, err := minio.New("localhost:9000", &minio.Options{
Creds: credentials.NewStaticV4("admin", "password", ""),
})
assert.NoError(s.T(), err)

// Ensure the artifacts aren't in the bucket.
_, err = c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-1", minio.StatObjectOptions{})
if err == nil {
err = c.RemoveObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-1", minio.RemoveObjectOptions{})
assert.NoError(s.T(), err)
}
_, err = c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-2", minio.StatObjectOptions{})
if err == nil {
err = c.RemoveObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-2", minio.RemoveObjectOptions{})
assert.NoError(s.T(), err)
}

then := s.Given().
Workflow(tt.workflowFile).
When().
Then()

// Assert the artifacts don't exist.
then.ExpectArtifactByKey("on-deletion-wf-stopped-1", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})
then.ExpectArtifactByKey("on-deletion-wf-stopped-2", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})

when := then.When().
SubmitWorkflow().
WaitForWorkflow(
fixtures.WorkflowCompletionOkay(true),
fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) {

condition := "for artifacts to exist"

_, err1 := c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-1", minio.StatObjectOptions{})
_, err2 := c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-2", minio.StatObjectOptions{})

if err1 == nil && err2 == nil {
return true, condition
}

return false, condition
}))

then = when.Then()

// Assert artifact exists
then.ExpectArtifactByKey("on-deletion-wf-stopped-1", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
})
then.ExpectArtifactByKey("on-deletion-wf-stopped-2", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
})

when = then.When()

when.
DeleteWorkflow().
WaitForWorkflowDeletion()

then = when.Then()

// Assert the artifacts don't exist.
then.ExpectArtifactByKey("on-deletion-wf-stopped-1", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})
then.ExpectArtifactByKey("on-deletion-wf-stopped-2", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})

when = then.When()

// Remove the finalizers so the workflow gets deleted in case the test failed.
when.RemoveFinalizers(false)
}
}

func (s *ArtifactsSuite) TestArtifactGC() {

s.Given().
Expand Down Expand Up @@ -322,7 +231,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
} else {
fmt.Printf("verifying artifact %s is not deleted\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
assert.Nil(t, err)
})
}
}
Expand Down Expand Up @@ -361,7 +270,7 @@ spec:

// create a ServiceAccount which won't be tied to the artifactgc role and attempt to use that service account in the GC Pod
// Want to verify that this causes the ArtifactGCError Condition in the Workflow
func (s *ArtifactsSuite) TestInsufficientRole() {
func (s *ArtifactsSuite) TestArtifactGC_InsufficientRole() {
ctx := context.Background()
_, err := s.KubeClient.CoreV1().ServiceAccounts(fixtures.Namespace).Create(ctx, &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Name: "artgc-role-test-sa"}}, metav1.CreateOptions{})
assert.NoError(s.T(), err)
Expand Down
Loading

0 comments on commit 3594f93

Please sign in to comment.