Skip to content

Commit

Permalink
fix(backend): skip reporting native Argo workflows which do not have …
Browse files Browse the repository at this point in the history
…Run ID label. Fixes #3584 (#4438)

Fixes 3584.

For clusters with existing native Argo Workflows, ml-pipeline logs were dirtied
with unneccessary stack traces due to "missing Run ID label" situation.

Made persistenceagent skip the workflow if it misses the Run ID label, and
added workflow name to previous error message in apiserver side.
  • Loading branch information
ekesken committed Sep 2, 2020
1 parent 0856a4d commit 1f2d417
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestPersistenceWorker_Success(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowRunId: "MY_UUID"},
},
})
workflowClient := client.NewWorkflowClientFake()
Expand Down Expand Up @@ -137,6 +138,7 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowRunId: "MY_UUID"},
},
})
workflowClient := client.NewWorkflowClientFake()
Expand Down
4 changes: 4 additions & 0 deletions backend/src/agent/persistence/worker/workflow_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (s *WorkflowSaver) Save(key string, namespace string, name string, nowEpoch
"Workflow (%s): transient failure: %v", key, err)

}
if _, ok := wf.ObjectMeta.Labels[util.LabelKeyWorkflowRunId]; !ok {
log.Infof("Skip syncing Workflow (%v): workflow does not have a Run ID label.", name)
return nil
}
if wf.PersistedFinalState() && time.Now().Unix()-wf.FinishedAt() < s.ttlSecondsAfterWorkflowFinish {
// Skip persisting the workflow if the workflow is finished
// and the workflow hasn't being passing the TTL
Expand Down
33 changes: 32 additions & 1 deletion backend/src/agent/persistence/worker/workflow_saver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestWorkflow_Save_Success(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowRunId: "MY_UUID"},
},
})

Expand Down Expand Up @@ -87,6 +88,7 @@ func TestWorkflow_Save_PermanentFailureWhileReporting(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowRunId: "MY_UUID"},
},
})

Expand All @@ -112,6 +114,7 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowRunId: "MY_UUID"},
},
})

Expand Down Expand Up @@ -167,7 +170,10 @@ func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"},
Labels: map[string]string{
util.LabelKeyWorkflowRunId: "MY_UUID",
util.LabelKeyWorkflowPersistedFinalState: "true",
},
},
Status: workflowapi.WorkflowStatus{
FinishedAt: metav1.Now(),
Expand All @@ -187,3 +193,28 @@ func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) {
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "permanent failure")
}

func TestWorkflow_Save_SkippedDDueToMissingRunID(t *testing.T) {
workflowFake := client.NewWorkflowClientFake()
pipelineFake := client.NewPipelineClientFake()

// Add this will result in failure unless reporting is skipped
pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT,
"My Permanent Error"))

workflow := util.NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
},
})

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT))
assert.Equal(t, nil, err)
}
2 changes: 1 addition & 1 deletion backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ func (r *ResourceManager) DeleteJob(jobID string) error {
func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error {
if _, ok := workflow.ObjectMeta.Labels[util.LabelKeyWorkflowRunId]; !ok {
// Skip reporting if the workflow doesn't have the run id label
return util.NewInvalidInputError("Workflow missing the Run ID label")
return util.NewInvalidInputError("Workflow[%s] missing the Run ID label", workflow.Name)
}
runId := workflow.ObjectMeta.Labels[util.LabelKeyWorkflowRunId]
jobId := workflow.ScheduledWorkflowUUIDAsStringOrEmpty()
Expand Down
13 changes: 13 additions & 0 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,19 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_NoExperiment_Success
assert.Equal(t, expectedRunDetail, runDetail)
}

func TestReportWorkflowResource_WorkflowMissingRunID(t *testing.T) {
store, manager, run := initWithOneTimeRun(t)
defer store.Close()
workflow := util.NewWorkflow(&v1alpha1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: run.Name,
},
})
err := manager.ReportWorkflowResource(workflow)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Workflow[workflow-name] missing the Run ID label")
}

func TestReportWorkflowResource_WorkflowCompleted(t *testing.T) {
store, manager, run := initWithOneTimeRun(t)
namespace := "kubeflow"
Expand Down

0 comments on commit 1f2d417

Please sign in to comment.