-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Add workflow template informer to server #13672
base: main
Are you sure you want to change the base?
Changes from all commits
d84ac0c
3906015
cf08531
a0633d1
37c710f
e3d99cd
0606c37
41e5cc2
5344a4c
dd878a9
31e05d8
21892f1
12b9b94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,8 +11,6 @@ import ( | |
restclient "k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/clientcmd" | ||
|
||
"github.com/argoproj/argo-workflows/v3/server/workflow/store" | ||
|
||
"github.com/argoproj/argo-workflows/v3" | ||
"github.com/argoproj/argo-workflows/v3/persist/sqldb" | ||
"github.com/argoproj/argo-workflows/v3/pkg/apiclient/clusterworkflowtemplate" | ||
|
@@ -27,6 +25,8 @@ import ( | |
cronworkflowserver "github.com/argoproj/argo-workflows/v3/server/cronworkflow" | ||
"github.com/argoproj/argo-workflows/v3/server/types" | ||
workflowserver "github.com/argoproj/argo-workflows/v3/server/workflow" | ||
"github.com/argoproj/argo-workflows/v3/server/workflow/store" | ||
workflowstore "github.com/argoproj/argo-workflows/v3/server/workflow/store" | ||
workflowtemplateserver "github.com/argoproj/argo-workflows/v3/server/workflowtemplate" | ||
"github.com/argoproj/argo-workflows/v3/util/help" | ||
"github.com/argoproj/argo-workflows/v3/util/instanceid" | ||
|
@@ -37,14 +37,34 @@ var ( | |
NoArgoServerErr = fmt.Errorf("this is impossible if you are not using the Argo Server, see %s", help.CLI()) | ||
) | ||
|
||
type ArgoKubeOpts struct { | ||
// Closing caching channel will stop caching informers | ||
CachingCloseCh chan struct{} | ||
|
||
// Whether to cache WorkflowTemplates, ClusterWorkflowTemplates and Workflows | ||
// This improves performance of reading | ||
// It is especially visible during validating templates, | ||
// | ||
// Note that templates caching currently uses informers, so not all template | ||
// get/list can use it, since informer has limited capabilities (such as filtering) | ||
// | ||
// Workflow caching uses in-memory SQLite DB and it provides full capabilities | ||
UseCaching bool | ||
} | ||
|
||
type argoKubeClient struct { | ||
opts ArgoKubeOpts | ||
instanceIDService instanceid.Service | ||
wfClient workflow.Interface | ||
wfTmplStore types.WorkflowTemplateStore | ||
cwfTmplStore types.ClusterWorkflowTemplateStore | ||
wfLister workflowstore.WorkflowLister | ||
wfStore workflowstore.WorkflowStore | ||
} | ||
|
||
var _ Client = &argoKubeClient{} | ||
|
||
func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, instanceIDService instanceid.Service) (context.Context, Client, error) { | ||
func newArgoKubeClient(ctx context.Context, opts ArgoKubeOpts, clientConfig clientcmd.ClientConfig, instanceIDService instanceid.Service) (context.Context, Client, error) { | ||
restConfig, err := clientConfig.ClientConfig() | ||
if err != nil { | ||
return nil, nil, err | ||
|
@@ -59,6 +79,10 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, | |
if err != nil { | ||
return nil, nil, err | ||
} | ||
namespace, _, err := clientConfig.Namespace() | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
eventSourceInterface, err := eventsource.NewForConfig(restConfig) | ||
if err != nil { | ||
return nil, nil, err | ||
|
@@ -86,21 +110,59 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, | |
if err != nil { | ||
return nil, nil, err | ||
} | ||
return ctx, &argoKubeClient{instanceIDService, wfClient}, nil | ||
|
||
client := &argoKubeClient{ | ||
opts: opts, | ||
instanceIDService: instanceIDService, | ||
wfClient: wfClient, | ||
} | ||
err = client.startStores(restConfig, namespace) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
return ctx, client, nil | ||
} | ||
|
||
func (a *argoKubeClient) startStores(restConfig *restclient.Config, namespace string) error { | ||
if a.opts.UseCaching { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was the intention - not to introduce breaking change. In the same time my team is using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure why you consider the caching version a breaking change? What does it break? This PR is marked as a performance improvement, but doesn't improve the performance of the product, only of your usage of it as a go-client? Why wouldn't everyone want this enabled? It uses more memory... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem I'm facing is that there is little testing happening in |
||
wftmplInformer, err := workflowtemplateserver.NewInformer(restConfig, namespace) | ||
if err != nil { | ||
return err | ||
} | ||
cwftmplInformer, err := clusterworkflowtmplserver.NewInformer(restConfig) | ||
if err != nil { | ||
return err | ||
} | ||
wfStore, err := store.NewSQLiteStore(a.instanceIDService) | ||
if err != nil { | ||
return err | ||
} | ||
wftmplInformer.Run(a.opts.CachingCloseCh) | ||
cwftmplInformer.Run(a.opts.CachingCloseCh) | ||
a.wfStore = wfStore | ||
a.wfLister = wfStore | ||
a.wfTmplStore = wftmplInformer | ||
a.cwfTmplStore = cwftmplInformer | ||
} else { | ||
a.wfLister = store.NewKubeLister(a.wfClient) | ||
a.wfTmplStore = workflowtemplateserver.NewWorkflowTemplateClientStore() | ||
a.cwfTmplStore = clusterworkflowtmplserver.NewClusterWorkflowTemplateClientStore() | ||
} | ||
return nil | ||
} | ||
|
||
func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient { | ||
wfArchive := sqldb.NullWorkflowArchive | ||
wfLister := store.NewKubeLister(a.wfClient) | ||
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil, nil)}} | ||
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, a.wfLister, a.wfStore, a.wfTmplStore, a.cwfTmplStore, nil)}} | ||
} | ||
|
||
func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) { | ||
return &errorTranslatingCronWorkflowServiceClient{&argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer(a.instanceIDService)}}, nil | ||
return &errorTranslatingCronWorkflowServiceClient{&argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer(a.instanceIDService, a.wfTmplStore, a.cwfTmplStore)}}, nil | ||
} | ||
|
||
func (a *argoKubeClient) NewWorkflowTemplateServiceClient() (workflowtemplate.WorkflowTemplateServiceClient, error) { | ||
return &errorTranslatingWorkflowTemplateServiceClient{&argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer(a.instanceIDService)}}, nil | ||
return &errorTranslatingWorkflowTemplateServiceClient{&argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer(a.instanceIDService, a.wfTmplStore, a.cwfTmplStore)}}, nil | ||
} | ||
|
||
func (a *argoKubeClient) NewArchivedWorkflowServiceClient() (workflowarchivepkg.ArchivedWorkflowServiceClient, error) { | ||
|
@@ -112,5 +174,5 @@ func (a *argoKubeClient) NewInfoServiceClient() (infopkg.InfoServiceClient, erro | |
} | ||
|
||
func (a *argoKubeClient) NewClusterWorkflowTemplateServiceClient() (clusterworkflowtemplate.ClusterWorkflowTemplateServiceClient, error) { | ||
return &errorTranslatingWorkflowClusterTemplateServiceClient{&argoKubeWorkflowClusterTemplateServiceClient{clusterworkflowtmplserver.NewClusterWorkflowTemplateServer(a.instanceIDService)}}, nil | ||
return &errorTranslatingWorkflowClusterTemplateServiceClient{&argoKubeWorkflowClusterTemplateServiceClient{clusterworkflowtmplserver.NewClusterWorkflowTemplateServer(a.instanceIDService, a.cwfTmplStore)}}, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package clusterworkflowtemplate | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
log "github.com/sirupsen/logrus" | ||
|
||
"k8s.io/client-go/dynamic" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/cache" | ||
|
||
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1" | ||
"github.com/argoproj/argo-workflows/v3/server/types" | ||
"github.com/argoproj/argo-workflows/v3/workflow/controller/informer" | ||
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution" | ||
) | ||
|
||
const ( | ||
workflowTemplateResyncPeriod = 20 * time.Minute | ||
) | ||
|
||
var _ types.ClusterWorkflowTemplateStore = &Informer{} | ||
|
||
type Informer struct { | ||
informer wfextvv1alpha1.ClusterWorkflowTemplateInformer | ||
} | ||
|
||
func NewInformer(restConfig *rest.Config) (*Informer, error) { | ||
dynamicInterface, err := dynamic.NewForConfig(restConfig) | ||
if err != nil { | ||
return nil, err | ||
} | ||
informer := informer.NewTolerantClusterWorkflowTemplateInformer( | ||
dynamicInterface, | ||
workflowTemplateResyncPeriod, | ||
) | ||
return &Informer{ | ||
informer: informer, | ||
}, nil | ||
} | ||
|
||
// Start informer in separate go-routine and block until cache sync | ||
func (cwti *Informer) Run(stopCh <-chan struct{}) { | ||
|
||
go cwti.informer.Informer().Run(stopCh) | ||
|
||
if !cache.WaitForCacheSync( | ||
stopCh, | ||
cwti.informer.Informer().HasSynced, | ||
) { | ||
log.Fatal("Timed out waiting for caches to sync") | ||
} | ||
} | ||
|
||
// if namespace contains empty string Lister will use the namespace provided during initialization | ||
func (cwti *Informer) Getter(_ context.Context) templateresolution.ClusterWorkflowTemplateGetter { | ||
if cwti.informer == nil { | ||
log.Fatal("Template informer not started") | ||
} | ||
return cwti.informer.Lister() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package clusterworkflowtemplate | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/argoproj/argo-workflows/v3/server/auth" | ||
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution" | ||
) | ||
|
||
// Store is a wrapper around informer | ||
type ClusterWorkflowTemplateClientStore struct { | ||
} | ||
|
||
func NewClusterWorkflowTemplateClientStore() *ClusterWorkflowTemplateClientStore { | ||
return &ClusterWorkflowTemplateClientStore{} | ||
} | ||
|
||
func (wcs *ClusterWorkflowTemplateClientStore) Getter(ctx context.Context) templateresolution.ClusterWorkflowTemplateGetter { | ||
wfClient := auth.GetWfClient(ctx) | ||
return templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This struct is never used as initialised in this code, nor are there any tests for
UseCaching = true
.I believe this might be "for the future" but please could it not be included in this PR and saved for a future one until it's tested and used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for the code that uses argo api client in go code, so we would be able to turn caching on - it will make a huge difference for us.