Skip to content

Commit 7cc20bb

Browse files
authored
fix: improve get archived workflow query performance during controller estimation. Fixes argoproj#13382 (argoproj#13394)
Signed-off-by: linzhengen <linzhengen@yahoo.co.jp>
1 parent 1239fd0 commit 7cc20bb

File tree

5 files changed

+93
-23
lines changed

5 files changed

+93
-23
lines changed

persist/sqldb/mocks/WorkflowArchive.go

Lines changed: 31 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

persist/sqldb/null_workflow_archive.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"fmt"
55
"time"
66

7+
"k8s.io/apimachinery/pkg/labels"
8+
79
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
810
sutils "github.com/argoproj/argo-workflows/v3/server/utils"
911
)
@@ -32,6 +34,10 @@ func (r *nullWorkflowArchive) GetWorkflow(string, string, string) (*wfv1.Workflo
3234
return nil, fmt.Errorf("getting archived workflows not supported")
3335
}
3436

37+
func (r *nullWorkflowArchive) GetWorkflowForEstimator(namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error) {
38+
return nil, fmt.Errorf("getting archived workflow for estimator not supported")
39+
}
40+
3541
func (r *nullWorkflowArchive) DeleteWorkflow(string) error {
3642
return fmt.Errorf("deleting archived workflows not supported")
3743
}

persist/sqldb/workflow_archive.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"google.golang.org/grpc/codes"
1111
corev1 "k8s.io/api/core/v1"
1212
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/apimachinery/pkg/labels"
1314
"k8s.io/apimachinery/pkg/types"
1415

1516
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
@@ -69,6 +70,7 @@ type WorkflowArchive interface {
6970
ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error)
7071
CountWorkflows(options sutils.ListOptions) (int64, error)
7172
GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error)
73+
GetWorkflowForEstimator(namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error)
7274
DeleteWorkflow(uid string) error
7375
DeleteExpiredWorkflows(ttl time.Duration) error
7476
IsEnabled() bool
@@ -289,6 +291,13 @@ func namePrefixClause(namePrefix string) db.Cond {
289291
return db.Cond{}
290292
}
291293

294+
func phaseEqual(phase string) db.Cond {
295+
if phase != "" {
296+
return db.Cond{"phase": phase}
297+
}
298+
return db.Cond{}
299+
}
300+
292301
func (r *workflowArchive) GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error) {
293302
var err error
294303
archivedWf := &archivedWorkflowRecord{}
@@ -343,6 +352,46 @@ func (r *workflowArchive) GetWorkflow(uid string, namespace string, name string)
343352
return wf, nil
344353
}
345354

355+
func (r *workflowArchive) GetWorkflowForEstimator(namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error) {
356+
selector := r.session.SQL().
357+
Select("name", "namespace", "uid", "startedat", "finishedat").
358+
From(archiveTableName).
359+
Where(r.clusterManagedNamespaceAndInstanceID()).
360+
And(phaseEqual(string(wfv1.NodeSucceeded)))
361+
362+
selector, err := BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, sutils.ListOptions{
363+
Namespace: namespace,
364+
LabelRequirements: requirements,
365+
Limit: 1,
366+
Offset: 0,
367+
}, false)
368+
if err != nil {
369+
return nil, err
370+
}
371+
372+
var awf archivedWorkflowMetadata
373+
err = selector.One(&awf)
374+
if err != nil {
375+
return nil, err
376+
}
377+
378+
return &wfv1.Workflow{
379+
ObjectMeta: v1.ObjectMeta{
380+
Name: awf.Name,
381+
Namespace: awf.Namespace,
382+
UID: types.UID(awf.UID),
383+
Labels: map[string]string{
384+
common.LabelKeyWorkflowArchivingStatus: "Persisted",
385+
},
386+
},
387+
Status: wfv1.WorkflowStatus{
388+
StartedAt: v1.Time{Time: awf.StartedAt},
389+
FinishedAt: v1.Time{Time: awf.FinishedAt},
390+
},
391+
}, nil
392+
393+
}
394+
346395
func (r *workflowArchive) DeleteWorkflow(uid string) error {
347396
rs, err := r.session.SQL().
348397
DeleteFrom(archiveTableName).

workflow/controller/estimation/estimator_factory.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
1111
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
12-
"github.com/argoproj/argo-workflows/v3/server/utils"
1312
"github.com/argoproj/argo-workflows/v3/workflow/common"
1413
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
1514
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"
@@ -72,23 +71,15 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) {
7271
return &estimator{wf, newestWf}, nil
7372
}
7473
// we failed to find a base-line in the live set, so we now look in the archive
75-
requirements, err := labels.ParseToRequirements(common.LabelKeyPhase + "=" + string(wfv1.NodeSucceeded) + "," + labelName + "=" + labelValue)
74+
requirements, err := labels.ParseToRequirements(labelName + "=" + labelValue)
7675
if err != nil {
7776
return defaultEstimator, fmt.Errorf("failed to parse selector to requirements: %v", err)
7877
}
79-
workflows, err := f.wfArchive.ListWorkflows(
80-
utils.ListOptions{
81-
Namespace: wf.Namespace,
82-
LabelRequirements: requirements,
83-
Limit: 1,
84-
Offset: 0,
85-
})
78+
baselineWF, err := f.wfArchive.GetWorkflowForEstimator(wf.Namespace, requirements)
8679
if err != nil {
87-
return defaultEstimator, fmt.Errorf("failed to list archived workflows: %v", err)
88-
}
89-
if len(workflows) > 0 {
90-
return &estimator{wf, &workflows[0]}, nil
80+
return defaultEstimator, fmt.Errorf("failed to get archived workflow for estimator: %v", err)
9181
}
82+
return &estimator{wf, baselineWF}, nil
9283
}
9384
}
9485
return defaultEstimator, nil

workflow/controller/estimation/estimator_factory_test.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
sqldbmocks "github.com/argoproj/argo-workflows/v3/persist/sqldb/mocks"
1111
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
12-
"github.com/argoproj/argo-workflows/v3/server/utils"
1312
testutil "github.com/argoproj/argo-workflows/v3/test/util"
1413
"github.com/argoproj/argo-workflows/v3/workflow/common"
1514
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
@@ -51,17 +50,11 @@ metadata:
5150
workflows.argoproj.io/phase: Succeeded
5251
`), wfFailed)
5352
wfArchive := &sqldbmocks.WorkflowArchive{}
54-
r, err := labels.ParseToRequirements("workflows.argoproj.io/phase=Succeeded,workflows.argoproj.io/workflow-template=my-archived-wftmpl")
53+
r, err := labels.ParseToRequirements("workflows.argoproj.io/workflow-template=my-archived-wftmpl")
5554
assert.NoError(t, err)
56-
wfArchive.On("ListWorkflows", utils.ListOptions{
57-
Namespace: "my-ns",
58-
LabelRequirements: r,
59-
Limit: 1,
60-
}).Return(wfv1.Workflows{
61-
*testutil.MustUnmarshalWorkflow(`
55+
wfArchive.On("GetWorkflowForEstimator", "my-ns", r).Return(testutil.MustUnmarshalWorkflow(`
6256
metadata:
63-
name: my-archived-wftmpl-baseline`),
64-
}, nil)
57+
name: my-archived-wftmpl-baseline`), nil)
6558
f := NewEstimatorFactory(informer, hydratorfake.Always, wfArchive)
6659
t.Run("None", func(t *testing.T) {
6760
p, err := f.NewEstimator(&wfv1.Workflow{})

0 commit comments

Comments
 (0)