Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Add workflow template informer to server #13672

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/apiclient/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Client interface {

type Opts struct {
ArgoServerOpts ArgoServerOpts
ArgoKubeOpts ArgoKubeOpts
InstanceID string
AuthSupplier func() string
// DEPRECATED: use `ClientConfigSupplier`
Expand Down Expand Up @@ -84,7 +85,7 @@ func NewClientFromOpts(opts Opts) (context.Context, Client, error) {
opts.ClientConfig = opts.ClientConfigSupplier()
}

ctx, client, err := newArgoKubeClient(opts.GetContext(), opts.ClientConfig, instanceid.NewService(opts.InstanceID))
ctx, client, err := newArgoKubeClient(opts.GetContext(), opts.ArgoKubeOpts, opts.ClientConfig, instanceid.NewService(opts.InstanceID))
return ctx, client, err
}
}
80 changes: 71 additions & 9 deletions pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/argoproj/argo-workflows/v3/server/workflow/store"

"github.com/argoproj/argo-workflows/v3"
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
"github.com/argoproj/argo-workflows/v3/pkg/apiclient/clusterworkflowtemplate"
Expand All @@ -27,6 +25,8 @@ import (
cronworkflowserver "github.com/argoproj/argo-workflows/v3/server/cronworkflow"
"github.com/argoproj/argo-workflows/v3/server/types"
workflowserver "github.com/argoproj/argo-workflows/v3/server/workflow"
"github.com/argoproj/argo-workflows/v3/server/workflow/store"
workflowstore "github.com/argoproj/argo-workflows/v3/server/workflow/store"
workflowtemplateserver "github.com/argoproj/argo-workflows/v3/server/workflowtemplate"
"github.com/argoproj/argo-workflows/v3/util/help"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
Expand All @@ -37,14 +37,34 @@ var (
NoArgoServerErr = fmt.Errorf("this is impossible if you are not using the Argo Server, see %s", help.CLI())
)

type ArgoKubeOpts struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This struct is never used as initialised in this code, nor are there any tests for UseCaching = true.

I believe this might be "for the future" but please could it not be included in this PR and saved for a future one until it's tested and used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for the code that uses argo api client in go code, so we would be able to turn caching on - it will make a huge difference for us.

// Closing caching channel will stop caching informers
CachingCloseCh chan struct{}

// Whether to cache WorkflowTemplates, ClusterWorkflowTemplates and Workflows
// This improves performance of reading
// It is especially visible during validating templates,
//
// Note that templates caching currently uses informers, so not all template
// get/list can use it, since informer has limited capabilities (such as filtering)
//
// Workflow caching uses in-memory SQLite DB and it provides full capabilities
UseCaching bool
}

type argoKubeClient struct {
opts ArgoKubeOpts
instanceIDService instanceid.Service
wfClient workflow.Interface
wfTmplStore types.WorkflowTemplateStore
cwfTmplStore types.ClusterWorkflowTemplateStore
wfLister workflowstore.WorkflowLister
wfStore workflowstore.WorkflowStore
}

var _ Client = &argoKubeClient{}

func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, instanceIDService instanceid.Service) (context.Context, Client, error) {
func newArgoKubeClient(ctx context.Context, opts ArgoKubeOpts, clientConfig clientcmd.ClientConfig, instanceIDService instanceid.Service) (context.Context, Client, error) {
restConfig, err := clientConfig.ClientConfig()
if err != nil {
return nil, nil, err
Expand All @@ -59,6 +79,10 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
if err != nil {
return nil, nil, err
}
namespace, _, err := clientConfig.Namespace()
if err != nil {
return nil, nil, err
}
eventSourceInterface, err := eventsource.NewForConfig(restConfig)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -86,21 +110,59 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
if err != nil {
return nil, nil, err
}
return ctx, &argoKubeClient{instanceIDService, wfClient}, nil

client := &argoKubeClient{
opts: opts,
instanceIDService: instanceIDService,
wfClient: wfClient,
}
err = client.startStores(restConfig, namespace)
if err != nil {
return nil, nil, err
}

return ctx, client, nil
}

func (a *argoKubeClient) startStores(restConfig *restclient.Config, namespace string) error {
if a.opts.UseCaching {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UseCaching appears to be always false

Copy link
Author

@jakkubu jakkubu Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the intention - not to introduce breaking change. In the same time my team is using argoKubeClient in code and we would like to enable caching here. The code that depends on this is tested - it's basically server code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why you consider the caching version a breaking change? What does it break?

This PR is marked as a performance improvement, but doesn't improve the performance of the product, only of your usage of it as a go-client? Why wouldn't everyone want this enabled? It uses more memory...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I'm facing is that there is little testing happening in pkg/apiclient.
I could expose this option in CLI to run e2e, to make it more testable. However I don't think this option make sense in CLI. Informer would simply make startup time longer - in very specific conditions this could make some difference. Even in such case you could simply connect to server that has caching enabled by default, instead of using k8s connection.

wftmplInformer, err := workflowtemplateserver.NewInformer(restConfig, namespace)
if err != nil {
return err
}
cwftmplInformer, err := clusterworkflowtmplserver.NewInformer(restConfig)
if err != nil {
return err
}
wfStore, err := store.NewSQLiteStore(a.instanceIDService)
if err != nil {
return err
}
wftmplInformer.Run(a.opts.CachingCloseCh)
cwftmplInformer.Run(a.opts.CachingCloseCh)
a.wfStore = wfStore
a.wfLister = wfStore
a.wfTmplStore = wftmplInformer
a.cwfTmplStore = cwftmplInformer
} else {
a.wfLister = store.NewKubeLister(a.wfClient)
a.wfTmplStore = workflowtemplateserver.NewWorkflowTemplateClientStore()
a.cwfTmplStore = clusterworkflowtmplserver.NewClusterWorkflowTemplateClientStore()
}
return nil
}

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
wfLister := store.NewKubeLister(a.wfClient)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil, nil)}}
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, a.wfLister, a.wfStore, a.wfTmplStore, a.cwfTmplStore, nil)}}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) {
return &errorTranslatingCronWorkflowServiceClient{&argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer(a.instanceIDService)}}, nil
return &errorTranslatingCronWorkflowServiceClient{&argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer(a.instanceIDService, a.wfTmplStore, a.cwfTmplStore)}}, nil
}

func (a *argoKubeClient) NewWorkflowTemplateServiceClient() (workflowtemplate.WorkflowTemplateServiceClient, error) {
return &errorTranslatingWorkflowTemplateServiceClient{&argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer(a.instanceIDService)}}, nil
return &errorTranslatingWorkflowTemplateServiceClient{&argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer(a.instanceIDService, a.wfTmplStore, a.cwfTmplStore)}}, nil
}

func (a *argoKubeClient) NewArchivedWorkflowServiceClient() (workflowarchivepkg.ArchivedWorkflowServiceClient, error) {
Expand All @@ -112,5 +174,5 @@ func (a *argoKubeClient) NewInfoServiceClient() (infopkg.InfoServiceClient, erro
}

func (a *argoKubeClient) NewClusterWorkflowTemplateServiceClient() (clusterworkflowtemplate.ClusterWorkflowTemplateServiceClient, error) {
return &errorTranslatingWorkflowClusterTemplateServiceClient{&argoKubeWorkflowClusterTemplateServiceClient{clusterworkflowtmplserver.NewClusterWorkflowTemplateServer(a.instanceIDService)}}, nil
return &errorTranslatingWorkflowClusterTemplateServiceClient{&argoKubeWorkflowClusterTemplateServiceClient{clusterworkflowtmplserver.NewClusterWorkflowTemplateServer(a.instanceIDService, a.cwfTmplStore)}}, nil
}
26 changes: 19 additions & 7 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type argoServer struct {
apiRateLimiter limiter.Store
allowedLinkProtocol []string
cache *cache.ResourceCache
restConfig *rest.Config
}

type ArgoServerOpts struct {
Expand Down Expand Up @@ -184,6 +185,7 @@ func NewArgoServer(ctx context.Context, opts ArgoServerOpts) (*argoServer, error
apiRateLimiter: store,
allowedLinkProtocol: opts.AllowedLinkProtocol,
cache: resourceCache,
restConfig: opts.RestConfig,
}, nil
}

Expand Down Expand Up @@ -227,6 +229,15 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, instanceIDService)
}
resourceCacheNamespace := getResourceCacheNamespace(as.managedNamespace)
wftmplStore, err := workflowtemplate.NewInformer(as.restConfig, resourceCacheNamespace)
if err != nil {
log.Fatal(err)
}
cwftmplInformer, err := clusterworkflowtemplate.NewInformer(as.restConfig)
if err != nil {
log.Fatal(err)
}
eventRecorderManager := events.NewEventRecorderManager(as.clients.Kubernetes)
artifactRepositories := artifactrepositories.New(as.clients.Kubernetes, as.managedNamespace, &config.ArtifactRepository)
artifactServer := artifacts.NewArtifactServer(as.gatekeeper, hydrator.New(offloadRepo), wfArchive, instanceIDService, artifactRepositories)
Expand All @@ -236,9 +247,8 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
resourceCacheNamespace := getResourceCacheNamespace(as.managedNamespace)
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore, &resourceCacheNamespace)
grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor)
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore, wftmplStore, cwftmplInformer, &resourceCacheNamespace)
grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wftmplStore, cwftmplInformer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

// Start listener
Expand Down Expand Up @@ -267,6 +277,8 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
httpL := tcpm.Match(cmux.HTTP1Fast())
grpcL := tcpm.Match(cmux.Any())

wftmplStore.Run(as.stopCh)
cwftmplInformer.Run(as.stopCh)
go eventServer.Run(as.stopCh)
go workflowServer.Run(as.stopCh)
go func() { as.checkServeErr("grpcServer", grpcServer.Serve(grpcL)) }()
Expand All @@ -285,7 +297,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
<-as.stopCh
}

func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workflowServer workflowpkg.WorkflowServiceServer, wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer, eventServer *event.Controller, links []*v1alpha1.Link, columns []*v1alpha1.Column, navColor string) *grpc.Server {
func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workflowServer workflowpkg.WorkflowServiceServer, wftmplStore types.WorkflowTemplateStore, cwftmplStore types.ClusterWorkflowTemplateStore, wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer, eventServer *event.Controller, links []*v1alpha1.Link, columns []*v1alpha1.Column, navColor string) *grpc.Server {
serverLog := log.NewEntry(log.StandardLogger())

// "Prometheus histograms are a great way to measure latency distributions of your RPCs. However, since it is bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default. To enable them please call the following in your server initialization code:"
Expand Down Expand Up @@ -324,10 +336,10 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workfl
eventsourcepkg.RegisterEventSourceServiceServer(grpcServer, eventsource.NewEventSourceServer())
sensorpkg.RegisterSensorServiceServer(grpcServer, sensor.NewSensorServer())
workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflowServer)
workflowtemplatepkg.RegisterWorkflowTemplateServiceServer(grpcServer, workflowtemplate.NewWorkflowTemplateServer(instanceIDService))
cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer(instanceIDService))
workflowtemplatepkg.RegisterWorkflowTemplateServiceServer(grpcServer, workflowtemplate.NewWorkflowTemplateServer(instanceIDService, wftmplStore, cwftmplStore))
cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer(instanceIDService, wftmplStore, cwftmplStore))
workflowarchivepkg.RegisterArchivedWorkflowServiceServer(grpcServer, wfArchiveServer)
clusterwftemplatepkg.RegisterClusterWorkflowTemplateServiceServer(grpcServer, clusterworkflowtemplate.NewClusterWorkflowTemplateServer(instanceIDService))
clusterwftemplatepkg.RegisterClusterWorkflowTemplateServiceServer(grpcServer, clusterworkflowtemplate.NewClusterWorkflowTemplateServer(instanceIDService, cwftmplStore))
grpc_prometheus.Register(grpcServer)
return grpcServer
}
Expand Down
20 changes: 11 additions & 9 deletions server/clusterworkflowtemplate/cluster_workflow_template_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ import (
clusterwftmplpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/clusterworkflowtemplate"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/auth"
servertypes "github.com/argoproj/argo-workflows/v3/server/types"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
"github.com/argoproj/argo-workflows/v3/workflow/creator"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
"github.com/argoproj/argo-workflows/v3/workflow/validate"

serverutils "github.com/argoproj/argo-workflows/v3/server/utils"
)

type ClusterWorkflowTemplateServer struct {
instanceIDService instanceid.Service
cwftmplStore servertypes.ClusterWorkflowTemplateStore
}

func NewClusterWorkflowTemplateServer(instanceID instanceid.Service) clusterwftmplpkg.ClusterWorkflowTemplateServiceServer {
return &ClusterWorkflowTemplateServer{instanceID}
func NewClusterWorkflowTemplateServer(instanceID instanceid.Service, cwftmplStore servertypes.ClusterWorkflowTemplateStore) clusterwftmplpkg.ClusterWorkflowTemplateServiceServer {
if cwftmplStore == nil {
cwftmplStore = NewClusterWorkflowTemplateClientStore()
}
return &ClusterWorkflowTemplateServer{instanceID, cwftmplStore}
}

func (cwts *ClusterWorkflowTemplateServer) CreateClusterWorkflowTemplate(ctx context.Context, req *clusterwftmplpkg.ClusterWorkflowTemplateCreateRequest) (*v1alpha1.ClusterWorkflowTemplate, error) {
Expand All @@ -34,7 +38,7 @@ func (cwts *ClusterWorkflowTemplateServer) CreateClusterWorkflowTemplate(ctx con
}
cwts.instanceIDService.Label(req.Template)
creator.Label(ctx, req.Template)
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
cwftmplGetter := cwts.cwftmplStore.Getter(ctx)
err := validate.ValidateClusterWorkflowTemplate(nil, cwftmplGetter, req.Template, validate.ValidateOpts{})
if err != nil {
return nil, serverutils.ToStatusError(err, codes.InvalidArgument)
Expand All @@ -55,8 +59,7 @@ func (cwts *ClusterWorkflowTemplateServer) GetClusterWorkflowTemplate(ctx contex
}

func (cwts *ClusterWorkflowTemplateServer) getTemplateAndValidate(ctx context.Context, name string) (*v1alpha1.ClusterWorkflowTemplate, error) {
wfClient := auth.GetWfClient(ctx)
wfTmpl, err := wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates().Get(ctx, name, v1.GetOptions{})
wfTmpl, err := cwts.cwftmplStore.Getter(ctx).Get(name)
if err != nil {
return nil, serverutils.ToStatusError(err, codes.Internal)
}
Expand Down Expand Up @@ -101,8 +104,7 @@ func (cwts *ClusterWorkflowTemplateServer) DeleteClusterWorkflowTemplate(ctx con
func (cwts *ClusterWorkflowTemplateServer) LintClusterWorkflowTemplate(ctx context.Context, req *clusterwftmplpkg.ClusterWorkflowTemplateLintRequest) (*v1alpha1.ClusterWorkflowTemplate, error) {
cwts.instanceIDService.Label(req.Template)
creator.Label(ctx, req.Template)
wfClient := auth.GetWfClient(ctx)
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
cwftmplGetter := cwts.cwftmplStore.Getter(ctx)

err := validate.ValidateClusterWorkflowTemplate(nil, cwftmplGetter, req.Template, validate.ValidateOpts{Lint: true})
if err != nil {
Expand All @@ -121,7 +123,7 @@ func (cwts *ClusterWorkflowTemplateServer) UpdateClusterWorkflowTemplate(ctx con
return nil, serverutils.ToStatusError(err, codes.InvalidArgument)
}
wfClient := auth.GetWfClient(ctx)
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
cwftmplGetter := cwts.cwftmplStore.Getter(ctx)

err = validate.ValidateClusterWorkflowTemplate(nil, cwftmplGetter, req.Template, validate.ValidateOpts{})
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func getClusterWorkflowTemplateServer() (clusterwftmplpkg.ClusterWorkflowTemplat
kubeClientSet := fake.NewSimpleClientset()
wfClientset := wftFake.NewSimpleClientset(&unlabelled, &cwftObj2, &cwftObj3)
ctx := context.WithValue(context.WithValue(context.WithValue(context.TODO(), auth.WfKey, wfClientset), auth.KubeKey, kubeClientSet), auth.ClaimsKey, &types.Claims{Claims: jwt.Claims{Subject: "my-sub"}})
return NewClusterWorkflowTemplateServer(instanceid.NewService("my-instanceid")), ctx
return NewClusterWorkflowTemplateServer(instanceid.NewService("my-instanceid"), nil), ctx
}

func TestWorkflowTemplateServer_CreateClusterWorkflowTemplate(t *testing.T) {
Expand Down
62 changes: 62 additions & 0 deletions server/clusterworkflowtemplate/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package clusterworkflowtemplate

import (
"context"
"time"

log "github.com/sirupsen/logrus"

"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/types"
"github.com/argoproj/argo-workflows/v3/workflow/controller/informer"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
)

const (
workflowTemplateResyncPeriod = 20 * time.Minute
)

var _ types.ClusterWorkflowTemplateStore = &Informer{}

type Informer struct {
informer wfextvv1alpha1.ClusterWorkflowTemplateInformer
}

func NewInformer(restConfig *rest.Config) (*Informer, error) {
dynamicInterface, err := dynamic.NewForConfig(restConfig)
if err != nil {
return nil, err
}
informer := informer.NewTolerantClusterWorkflowTemplateInformer(
dynamicInterface,
workflowTemplateResyncPeriod,
)
return &Informer{
informer: informer,
}, nil
}

// Start informer in separate go-routine and block until cache sync
func (cwti *Informer) Run(stopCh <-chan struct{}) {

go cwti.informer.Informer().Run(stopCh)

if !cache.WaitForCacheSync(
stopCh,
cwti.informer.Informer().HasSynced,
) {
log.Fatal("Timed out waiting for caches to sync")
}
}

// if namespace contains empty string Lister will use the namespace provided during initialization
func (cwti *Informer) Getter(_ context.Context) templateresolution.ClusterWorkflowTemplateGetter {
if cwti.informer == nil {
log.Fatal("Template informer not started")
}
return cwti.informer.Lister()
}
21 changes: 21 additions & 0 deletions server/clusterworkflowtemplate/wf_client_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package clusterworkflowtemplate

import (
"context"

"github.com/argoproj/argo-workflows/v3/server/auth"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
)

// Store is a wrapper around informer
type ClusterWorkflowTemplateClientStore struct {
}

func NewClusterWorkflowTemplateClientStore() *ClusterWorkflowTemplateClientStore {
return &ClusterWorkflowTemplateClientStore{}
}

func (wcs *ClusterWorkflowTemplateClientStore) Getter(ctx context.Context) templateresolution.ClusterWorkflowTemplateGetter {
wfClient := auth.GetWfClient(ctx)
return templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
}
Loading
Loading