Skip to content

Commit

Permalink
fix: list and watch permission missing for server in namespaced mode
Browse files Browse the repository at this point in the history
Signed-off-by: Jiacheng Xu <xjcmaxwellcjx@gmail.com>
  • Loading branch information
jiachengxu committed Jun 14, 2024
1 parent dc6a18d commit a148a36
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
3 changes: 2 additions & 1 deletion pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

eventsource "github.com/argoproj/argo-events/pkg/client/eventsource/clientset/versioned"
sensor "github.com/argoproj/argo-events/pkg/client/sensor/clientset/versioned"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -92,7 +93,7 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
wfLister := store.NewKubeLister(a.wfClient)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil)}}
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil, v1.NamespaceAll)}}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) {
Expand Down
10 changes: 5 additions & 5 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ func init() {
}
}

func getResourceCacheNamespace(opts ArgoServerOpts) string {
if opts.ManagedNamespace != "" {
return opts.ManagedNamespace
func getResourceCacheNamespace(managedNamespace string) string {
if managedNamespace != "" {
return managedNamespace
}
return v1.NamespaceAll
}
Expand All @@ -146,7 +146,7 @@ func NewArgoServer(ctx context.Context, opts ArgoServerOpts) (*argoServer, error
}
if ssoIf.IsRBACEnabled() {
// resourceCache is only used for SSO RBAC
resourceCache = cache.NewResourceCache(opts.Clients.Kubernetes, getResourceCacheNamespace(opts))
resourceCache = cache.NewResourceCache(opts.Clients.Kubernetes, getResourceCacheNamespace(opts.ManagedNamespace))
resourceCache.Run(ctx.Done())
}
log.Info("SSO enabled")
Expand Down Expand Up @@ -236,7 +236,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore)
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore, getResourceCacheNamespace(as.managedNamespace))
grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

Expand Down
6 changes: 3 additions & 3 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type workflowServer struct {
var _ workflowpkg.WorkflowServiceServer = &workflowServer{}

// NewWorkflowServer returns a new WorkflowServer
func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfLister store.WorkflowLister, wfStore store.WorkflowStore) *workflowServer {
func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfLister store.WorkflowLister, wfStore store.WorkflowStore, namespace string) *workflowServer {
ws := &workflowServer{
instanceIDService: instanceIDService,
offloadNodeStatusRepo: offloadNodeStatusRepo,
Expand All @@ -70,10 +70,10 @@ func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRe
if wfStore != nil {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).List(context.Background(), options)
return wfClientSet.ArgoprojV1alpha1().Workflows(namespace).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).Watch(context.Background(), options)
return wfClientSet.ArgoprojV1alpha1().Workflows(namespace).Watch(context.Background(), options)
},
}
wfReflector := cache.NewReflector(lw, &wfv1.Workflow{}, wfStore, reSyncDuration)
Expand Down
2 changes: 1 addition & 1 deletion server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) {
if err = wfStore.Add(&wfObj5); err != nil {
panic(err)
}
server := NewWorkflowServer(instanceIdSvc, offloadNodeStatusRepo, archivedRepo, wfClientset, wfStore, wfStore)
server := NewWorkflowServer(instanceIdSvc, offloadNodeStatusRepo, archivedRepo, wfClientset, wfStore, wfStore, metav1.NamespaceAll)
return server, ctx
}

Expand Down

0 comments on commit a148a36

Please sign in to comment.