Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support only watching key resources in one namespace #1821

Merged
merged 3 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions helm/core/templates/controller-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ spec:
periodSeconds: 3
timeoutSeconds: 5
env:
{{- if .Values.global.watchNamespace }}
- name: ISTIO_WATCH_NAMESPACE
value: "{{ .Values.global.watchNamespace }}"
{{- end }}
- name: ENABLE_PUSH_ALL_MCP_CLUSTERS
value: "{{ .Values.global.enablePushAllMCPClusters }}"
- name: PILOT_ENABLE_LDS_CACHE
Expand Down
2 changes: 1 addition & 1 deletion pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (s *Server) initConfigController() error {
options.ClusterId = ""
}

ingressConfig := translation.NewIngressTranslation(s.kubeClient, s.xdsServer, ns, options.ClusterId)
ingressConfig := translation.NewIngressTranslation(s.kubeClient, s.xdsServer, ns, options)
ingressConfig.AddLocalCluster(options)

s.configStores = append(s.configStores, ingressConfig)
Expand Down
11 changes: 6 additions & 5 deletions pkg/ingress/config/ingress_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ type IngressConfig struct {
httpsConfigMgr *cert.ConfigMgr
}

func NewIngressConfig(localKubeClient kube.Client, xdsUpdater istiomodel.XDSUpdater, namespace string, clusterId cluster.ID) *IngressConfig {
func NewIngressConfig(localKubeClient kube.Client, xdsUpdater istiomodel.XDSUpdater, namespace string, options common.Options) *IngressConfig {
clusterId := options.ClusterId
if clusterId == "Kubernetes" {
clusterId = ""
}
Expand All @@ -170,17 +171,17 @@ func NewIngressConfig(localKubeClient kube.Client, xdsUpdater istiomodel.XDSUpda
wasmPlugins: make(map[string]*extensions.WasmPlugin),
http2rpcs: make(map[string]*higressv1.Http2Rpc),
}
mcpbridgeController := mcpbridge.NewController(localKubeClient, clusterId)
mcpbridgeController := mcpbridge.NewController(localKubeClient, options)
mcpbridgeController.AddEventHandler(config.AddOrUpdateMcpBridge, config.DeleteMcpBridge)
config.mcpbridgeController = mcpbridgeController
config.mcpbridgeLister = mcpbridgeController.Lister()

wasmPluginController := wasmplugin.NewController(localKubeClient, clusterId)
wasmPluginController := wasmplugin.NewController(localKubeClient, options)
wasmPluginController.AddEventHandler(config.AddOrUpdateWasmPlugin, config.DeleteWasmPlugin)
config.wasmPluginController = wasmPluginController
config.wasmPluginLister = wasmPluginController.Lister()

http2rpcController := http2rpc.NewController(localKubeClient, clusterId)
http2rpcController := http2rpc.NewController(localKubeClient, options)
http2rpcController.AddEventHandler(config.AddOrUpdateHttp2Rpc, config.DeleteHttp2Rpc)
config.http2rpcController = http2rpcController
config.http2rpcLister = http2rpcController.Lister()
Expand Down Expand Up @@ -225,7 +226,7 @@ func (m *IngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f ist
}

func (m *IngressConfig) AddLocalCluster(options common.Options) {
secretController := secret.NewController(m.localKubeClient, options.ClusterId)
secretController := secret.NewController(m.localKubeClient, options)
secretController.AddEventHandler(m.ReflectSecretChanges)

var ingressController common.IngressController
Expand Down
9 changes: 8 additions & 1 deletion pkg/ingress/config/ingress_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,14 @@ func TestConvertGatewaysForIngress(t *testing.T) {
}
ingressV1Beta1Controller := controllerv1beta1.NewController(fake, fake, v1Beta1Options, nil)
ingressV1Controller := controllerv1.NewController(fake, fake, v1Options, nil)
m := NewIngressConfig(fake, nil, "wakanda", "gw-123-istio")
options := common.Options{
Enable: true,
ClusterId: "gw-123-istio",
RawClusterId: "gw-123-istio__",
GatewayHttpPort: 80,
GatewayHttpsPort: 443,
}
m := NewIngressConfig(fake, nil, "wakanda", options)
m.remoteIngressControllers = map[cluster.ID]common.IngressController{
"ingress-v1beta1": ingressV1Beta1Controller,
"ingress-v1": ingressV1Controller,
Expand Down
5 changes: 3 additions & 2 deletions pkg/ingress/config/kingress_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ type KIngressConfig struct {
clusterId cluster.ID
}

func NewKIngressConfig(localKubeClient kube.Client, XDSUpdater istiomodel.XDSUpdater, namespace string, clusterId cluster.ID) *KIngressConfig {
func NewKIngressConfig(localKubeClient kube.Client, XDSUpdater istiomodel.XDSUpdater, namespace string, options common.Options) *KIngressConfig {
if localKubeClient.KIngressInformer() == nil {
return nil
}
clusterId := options.ClusterId
if clusterId == "Kubernetes" {
clusterId = ""
}
Expand Down Expand Up @@ -114,7 +115,7 @@ func (m *KIngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f is
}

func (m *KIngressConfig) AddLocalCluster(options common.Options) common.KIngressController {
secretController := secret.NewController(m.localKubeClient, options.ClusterId)
secretController := secret.NewController(m.localKubeClient, options)
secretController.AddEventHandler(m.ReflectSecretChanges)

var ingressController common.KIngressController
Expand Down
9 changes: 8 additions & 1 deletion pkg/ingress/config/kingress_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,14 @@ func TestConvertGatewaysForKIngress(t *testing.T) {
RawClusterId: "kingress__",
}
kingressV1Controller := kcontrollerv1.NewController(fake, fake, v1Options, nil)
m := NewKIngressConfig(fake, nil, "wakanda", "gw-123-istio")
options := common.Options{
Enable: true,
ClusterId: "gw-123-istio",
RawClusterId: "gw-123-istio__",
GatewayHttpPort: 80,
GatewayHttpsPort: 443,
}
m := NewKIngressConfig(fake, nil, "wakanda", options)
m.remoteIngressControllers = map[cluster.ID]common.KIngressController{
"kingress": kingressV1Controller,
}
Expand Down
22 changes: 17 additions & 5 deletions pkg/ingress/kube/http2rpc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,33 @@
package http2rpc

import (
"istio.io/istio/pkg/cluster"
"time"

"istio.io/istio/pkg/kube/controllers"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
"github.com/alibaba/higress/client/pkg/clientset/versioned"
informersv1 "github.com/alibaba/higress/client/pkg/informers/externalversions/networking/v1"
listersv1 "github.com/alibaba/higress/client/pkg/listers/networking/v1"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
kubeclient "github.com/alibaba/higress/pkg/kube"
)

type Http2RpcController controller.Controller[listersv1.Http2RpcLister]

func NewController(client kubeclient.Client, clusterId cluster.ID) Http2RpcController {
informer := client.HigressInformer().Networking().V1().Http2Rpcs().Informer()
return controller.NewCommonController("http2rpc", client.HigressInformer().Networking().V1().Http2Rpcs().Lister(),
informer, GetHttp2Rpc, clusterId)
func NewController(client kubeclient.Client, options common.Options) Http2RpcController {
var informer cache.SharedIndexInformer
if options.WatchNamespace == "" {
informer = client.HigressInformer().Networking().V1().Http2Rpcs().Informer()
} else {
informer = client.HigressInformer().InformerFor(&v1.Http2Rpc{}, func(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return informersv1.NewHttp2RpcInformer(client, options.WatchNamespace, resyncPeriod, nil)
})
}
return controller.NewCommonController("http2rpc", listersv1.NewHttp2RpcLister(informer.GetIndexer()), informer, GetHttp2Rpc, options.ClusterId)
}

func GetHttp2Rpc(lister listersv1.Http2RpcLister, namespacedName types.NamespacedName) (controllers.Object, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingress/kube/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type controller struct {
// NewController creates a new Kubernetes controller
func NewController(localKubeClient, client kubeclient.Client, options common.Options,
secretController secret.SecretController) common.IngressController {
opts := ktypes.InformerOptions{}
opts := ktypes.InformerOptions{Namespace: options.WatchNamespace}
ingressInformer := util.GetInformerFiltered(client, opts, gvrIngressV1Beta1, &ingress.Ingress{},
func(options metav1.ListOptions) (runtime.Object, error) {
return client.Kube().NetworkingV1beta1().Ingresses(opts.Namespace).List(context.Background(), options)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/kube/ingress/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestIngressControllerApplies(t *testing.T) {

options := common.Options{IngressClass: "mse", ClusterId: ""}

secretController := secret.NewController(localKubeClient, options.ClusterId)
secretController := secret.NewController(localKubeClient, options)
ingressController := NewController(localKubeClient, client, options, secretController)

testcases := map[string]func(*testing.T, common.IngressController){
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestIngressControllerConventions(t *testing.T) {

options := common.Options{IngressClass: "mse", ClusterId: "", EnableStatus: true}

secretController := secret.NewController(localKubeClient, options.ClusterId)
secretController := secret.NewController(localKubeClient, options)
ingressController := NewController(localKubeClient, client, options, secretController)

testcases := map[string]func(*testing.T, common.IngressController){
Expand Down Expand Up @@ -1142,7 +1142,7 @@ func TestIngressControllerProcessing(t *testing.T) {

options := common.Options{IngressClass: "mse", ClusterId: "", EnableStatus: true}

secretController := secret.NewController(localKubeClient, options.ClusterId)
secretController := secret.NewController(localKubeClient, options)

opts := ktypes.InformerOptions{}
ingressInformer := util.GetInformerFiltered(fakeClient, opts, gvrIngressV1Beta1, &ingress.Ingress{},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingress/kube/ingressv1/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type controller struct {

// NewController creates a new Kubernetes controller
func NewController(localKubeClient, client kubeclient.Client, options common.Options, secretController secret.SecretController) common.IngressController {
opts := ktypes.InformerOptions{}
opts := ktypes.InformerOptions{Namespace: options.WatchNamespace}
ingressInformer := schemakubeclient.GetInformerFilteredFromGVR(client, opts, gvr.Ingress)
ingressLister := networkinglister.NewIngressLister(ingressInformer.Informer.GetIndexer())
serviceInformer := schemakubeclient.GetInformerFilteredFromGVR(client, opts, gvr.Service)
Expand Down
24 changes: 17 additions & 7 deletions pkg/ingress/kube/kingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/hashicorp/go-multierror"
networking "istio.io/api/networking/v1alpha3"
Expand All @@ -43,7 +44,9 @@ import (
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
ingress "knative.dev/networking/pkg/apis/networking/v1alpha1"
networkingv1alpha1 "knative.dev/networking/pkg/client/listers/networking/v1alpha1"
"knative.dev/networking/pkg/client/clientset/versioned"
informernetworkingv1alpha1 "knative.dev/networking/pkg/client/informers/externalversions/networking/v1alpha1"
listernetworkingv1alpha1 "knative.dev/networking/pkg/client/listers/networking/v1alpha1"

"github.com/alibaba/higress/pkg/ingress/kube/annotations"
"github.com/alibaba/higress/pkg/ingress/kube/common"
Expand Down Expand Up @@ -76,7 +79,7 @@ type controller struct {
ingresses map[string]*ingress.Ingress

ingressInformer cache.SharedInformer
ingressLister networkingv1alpha1.IngressLister
ingressLister listernetworkingv1alpha1.IngressLister
serviceInformer informerfactory.StartableInformer
serviceLister listerv1.ServiceLister
secretController secret.SecretController
Expand All @@ -86,16 +89,23 @@ type controller struct {
// NewController creates a new Kubernetes controller
func NewController(localKubeClient, client kube.Client, options common.Options,
secretController secret.SecretController) common.KIngressController {
//var namespace string = "default"
ingressInformer := client.KIngressInformer().Networking().V1alpha1().Ingresses()
serviceInformer := schemakubeclient.GetInformerFilteredFromGVR(client, ktypes.InformerOptions{}, gvr.Service)
var ingressInformer cache.SharedIndexInformer
if options.WatchNamespace == "" {
ingressInformer = client.KIngressInformer().Networking().V1alpha1().Ingresses().Informer()
} else {
ingressInformer = client.KIngressInformer().InformerFor(&ingress.Ingress{}, func(c versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return informernetworkingv1alpha1.NewIngressInformer(c, options.WatchNamespace, resyncPeriod, nil)
})
}
ingressLister := listernetworkingv1alpha1.NewIngressLister(ingressInformer.GetIndexer())
serviceInformer := schemakubeclient.GetInformerFilteredFromGVR(client, ktypes.InformerOptions{Namespace: options.WatchNamespace}, gvr.Service)
serviceLister := listerv1.NewServiceLister(serviceInformer.Informer.GetIndexer())

c := &controller{
options: options,
ingresses: make(map[string]*ingress.Ingress),
ingressInformer: ingressInformer.Informer(),
ingressLister: ingressInformer.Lister(),
ingressInformer: ingressInformer,
ingressLister: ingressLister,
serviceInformer: serviceInformer,
serviceLister: serviceLister,
secretController: secretController,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingress/kube/kingress/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestKIngressControllerConventions(t *testing.T) {

options := common.Options{IngressClass: "mse", ClusterId: "", EnableStatus: true}

secretController := secret.NewController(localKubeClient, options.ClusterId)
secretController := secret.NewController(localKubeClient, options)
ingressController := NewController(localKubeClient, client, options, secretController)

testcases := map[string]func(*testing.T, common.KIngressController){
Expand Down
22 changes: 17 additions & 5 deletions pkg/ingress/kube/mcpbridge/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,33 @@
package mcpbridge

import (
"istio.io/istio/pkg/cluster"
"time"

"istio.io/istio/pkg/kube/controllers"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
"github.com/alibaba/higress/client/pkg/clientset/versioned"
informersv1 "github.com/alibaba/higress/client/pkg/informers/externalversions/networking/v1"
listersv1 "github.com/alibaba/higress/client/pkg/listers/networking/v1"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
kubeclient "github.com/alibaba/higress/pkg/kube"
)

type McpBridgeController controller.Controller[listersv1.McpBridgeLister]

func NewController(client kubeclient.Client, clusterId cluster.ID) McpBridgeController {
informer := client.HigressInformer().Networking().V1().McpBridges().Informer()
return controller.NewCommonController("mcpbridge", client.HigressInformer().Networking().V1().McpBridges().Lister(),
informer, GetMcpBridge, clusterId)
func NewController(client kubeclient.Client, options common.Options) McpBridgeController {
var informer cache.SharedIndexInformer
if options.WatchNamespace == "" {
informer = client.HigressInformer().Networking().V1().McpBridges().Informer()
} else {
informer = client.HigressInformer().InformerFor(&v1.McpBridge{}, func(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return informersv1.NewMcpBridgeInformer(client, options.WatchNamespace, resyncPeriod, nil)
})
}
return controller.NewCommonController("mcpbridge", listersv1.NewMcpBridgeLister(informer.GetIndexer()), informer, GetMcpBridge, options.ClusterId)
}

func GetMcpBridge(lister listersv1.McpBridgeLister, namespacedName types.NamespacedName) (controllers.Object, error) {
Expand Down
11 changes: 5 additions & 6 deletions pkg/ingress/kube/secret/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,32 @@
package secret

import (
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config/schema/gvr"
schemakubeclient "istio.io/istio/pkg/config/schema/kubeclient"
kubeclient "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
ktypes "istio.io/istio/pkg/kube/kubetypes"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
listersv1 "k8s.io/client-go/listers/core/v1"
)

type SecretController controller.Controller[listersv1.SecretLister]

func NewController(client kubeclient.Client, clusterId cluster.ID) SecretController {
func NewController(client kubeclient.Client, options common.Options) SecretController {
opts := ktypes.InformerOptions{
Namespace: metav1.NamespaceAll,
Cluster: clusterId,
Namespace: options.WatchNamespace,
Cluster: options.ClusterId,
FieldSelector: fields.AndSelectors(
fields.OneTermNotEqualSelector("type", "helm.sh/release.v1"),
fields.OneTermNotEqualSelector("type", string(v1.SecretTypeServiceAccountToken)),
).String(),
}
informer := schemakubeclient.GetInformerFilteredFromGVR(client, opts, gvr.Secret)
return controller.NewCommonController("secret", listersv1.NewSecretLister(informer.Informer.GetIndexer()), informer.Informer, GetSecret, clusterId)
return controller.NewCommonController("secret", listersv1.NewSecretLister(informer.Informer.GetIndexer()), informer.Informer, GetSecret, options.ClusterId)
}

func GetSecret(lister listersv1.SecretLister, namespacedName types.NamespacedName) (controllers.Object, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/kube/secret/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package secret

import (
"context"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -43,7 +44,7 @@ var period = time.Second

func TestController(t *testing.T) {
client := kubeclient.NewFakeClient()
ctrl := NewController(client, "fake-cluster")
ctrl := NewController(client, common.Options{ClusterId: "fake-cluster"})

stop := make(chan struct{})
t.Cleanup(func() {
Expand Down
22 changes: 17 additions & 5 deletions pkg/ingress/kube/wasmplugin/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,33 @@
package wasmplugin

import (
"istio.io/istio/pkg/cluster"
"time"

"istio.io/istio/pkg/kube/controllers"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

v1 "github.com/alibaba/higress/client/pkg/apis/extensions/v1alpha1"
"github.com/alibaba/higress/client/pkg/clientset/versioned"
informersv1 "github.com/alibaba/higress/client/pkg/informers/externalversions/extensions/v1alpha1"
listersv1 "github.com/alibaba/higress/client/pkg/listers/extensions/v1alpha1"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
kubeclient "github.com/alibaba/higress/pkg/kube"
)

type WasmPluginController controller.Controller[listersv1.WasmPluginLister]

func NewController(client kubeclient.Client, clusterId cluster.ID) WasmPluginController {
informer := client.HigressInformer().Extensions().V1alpha1().WasmPlugins().Informer()
return controller.NewCommonController("wasmplugin", client.HigressInformer().Extensions().V1alpha1().WasmPlugins().Lister(),
informer, GetWasmPlugin, clusterId)
func NewController(client kubeclient.Client, options common.Options) WasmPluginController {
var informer cache.SharedIndexInformer
if options.WatchNamespace == "" {
informer = client.HigressInformer().Extensions().V1alpha1().WasmPlugins().Informer()
} else {
informer = client.HigressInformer().InformerFor(&v1.WasmPlugin{}, func(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return informersv1.NewWasmPluginInformer(client, options.WatchNamespace, resyncPeriod, nil)
})
}
return controller.NewCommonController("wasmplugin", listersv1.NewWasmPluginLister(informer.GetIndexer()), informer, GetWasmPlugin, options.ClusterId)
}

func GetWasmPlugin(lister listersv1.WasmPluginLister, namespacedName types.NamespacedName) (controllers.Object, error) {
Expand Down
Loading
Loading