From 5ccd3d1fbdc171d059ac8d2d3fc6722c6642a7cf Mon Sep 17 00:00:00 2001 From: Jianjun Shen Date: Wed, 9 Feb 2022 09:06:42 -0800 Subject: [PATCH] Update NPL and Secondary Network controllers initialization code (#3257) Change secondary network Pod controller to use the shared localPodInfomer. Signed-off-by: Jianjun Shen --- cmd/antrea-agent/agent.go | 54 ++++++++++--------- pkg/agent/nodeportlocal/k8s/npl_controller.go | 7 ++- pkg/agent/nodeportlocal/npl_agent_init.go | 21 +------- pkg/agent/nodeportlocal/npl_agent_test.go | 5 +- .../secondarynetwork/podwatch/controller.go | 27 ---------- 5 files changed, 37 insertions(+), 77 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index c80035c4d50..eecb52fce49 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -451,7 +451,9 @@ func run(o *Options) error { // Initialize localPodInformer for NPLAgent and AntreaIPAMController var localPodInformer cache.SharedIndexInformer - if features.DefaultFeatureGate.Enabled(features.NodePortLocal) && o.config.NodePortLocal.Enable || features.DefaultFeatureGate.Enabled(features.AntreaIPAM) { + if features.DefaultFeatureGate.Enabled(features.NodePortLocal) && o.config.NodePortLocal.Enable || + features.DefaultFeatureGate.Enabled(features.AntreaIPAM) || + features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { listOptions := func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeConfig.Name).String() } @@ -463,6 +465,19 @@ func run(o *Options) error { listOptions, ) } + + log.StartLogFileNumberMonitor(stopCh) + + go routeClient.Run(stopCh) + + go cniServer.Run(stopCh) + + go antreaClientProvider.Run(stopCh) + + go nodeRouteController.Run(stopCh) + + go networkPolicyController.Run(stopCh) + // Initialize the NPL agent. if features.DefaultFeatureGate.Enabled(features.NodePortLocal) && o.config.NodePortLocal.Enable { nplController, err := npl.InitializeNPLAgent( @@ -477,6 +492,7 @@ func run(o *Options) error { } go nplController.Run(stopCh) } + // Initialize the Antrea IPAM agent. if features.DefaultFeatureGate.Enabled(features.AntreaIPAM) { ipamController, err := ipam.InitializeAntreaIPAMController( @@ -490,25 +506,6 @@ func run(o *Options) error { } go ipamController.Run(stopCh) } - // Start the localPodInformer - if localPodInformer != nil { - go localPodInformer.Run(stopCh) - } - - log.StartLogFileNumberMonitor(stopCh) - - go routeClient.Run(stopCh) - - go cniServer.Run(stopCh) - - informerFactory.Start(stopCh) - crdInformerFactory.Start(stopCh) - - go antreaClientProvider.Run(stopCh) - - go nodeRouteController.Run(stopCh) - - go networkPolicyController.Run(stopCh) if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { // Create the NetworkAttachmentDefinition client, which handles access to secondary network object definition from the API Server. @@ -516,20 +513,25 @@ func run(o *Options) error { if err != nil { return fmt.Errorf("NetworkAttachmentDefinition client creation failed. %v", err) } - // Initialize podController to handle secondary network configuration for Pods with k8s.v1.cni.cncf.io/networks Annotation defined. - podWatchController, err := podwatch.InitializePodController( + // Create podController to handle secondary network configuration for Pods with k8s.v1.cni.cncf.io/networks Annotation defined. + podWatchController := podwatch.NewPodController( k8sClient, netAttachDefClient, - informerFactory, + localPodInformer, nodeConfig.Name, cniPodInfoStore, cniServer) - if err != nil { - return fmt.Errorf("failed to initialize Pod Controller for secondary network interface management: %v", err) - } go podWatchController.Run(stopCh) } + // Start the localPodInformer + if localPodInformer != nil { + go localPodInformer.Run(stopCh) + } + + informerFactory.Start(stopCh) + crdInformerFactory.Start(stopCh) + if features.DefaultFeatureGate.Enabled(features.Egress) || features.DefaultFeatureGate.Enabled(features.ServiceExternalIP) { go externalIPPoolController.Run(stopCh) go localIPDetector.Run(stopCh) diff --git a/pkg/agent/nodeportlocal/k8s/npl_controller.go b/pkg/agent/nodeportlocal/k8s/npl_controller.go index d313673afd2..ba152354aad 100644 --- a/pkg/agent/nodeportlocal/k8s/npl_controller.go +++ b/pkg/agent/nodeportlocal/k8s/npl_controller.go @@ -42,10 +42,14 @@ import ( ) const ( - controllerName = "AntreaAgentNPLController" + controllerName = "NPLController" minRetryDelay = 2 * time.Second maxRetryDelay = 120 * time.Second numWorkers = 4 + + // Set resyncPeriod to 0 to disable resyncing. + // UpdateFunc event handler will be called only when the object is actually updated. + resyncPeriod = 0 * time.Minute ) type NPLController struct { @@ -63,7 +67,6 @@ type NPLController struct { func NewNPLController(kubeClient clientset.Interface, podInformer cache.SharedIndexInformer, svcInformer cache.SharedIndexInformer, - resyncPeriod time.Duration, pt *portcache.PortTable, nodeName string) *NPLController { c := NPLController{ diff --git a/pkg/agent/nodeportlocal/npl_agent_init.go b/pkg/agent/nodeportlocal/npl_agent_init.go index fb0336913cf..b2b09ef427f 100644 --- a/pkg/agent/nodeportlocal/npl_agent_init.go +++ b/pkg/agent/nodeportlocal/npl_agent_init.go @@ -19,7 +19,6 @@ package nodeportlocal import ( "fmt" - "time" nplk8s "antrea.io/antrea/pkg/agent/nodeportlocal/k8s" "antrea.io/antrea/pkg/agent/nodeportlocal/portcache" @@ -29,10 +28,6 @@ import ( "k8s.io/client-go/tools/cache" ) -// Set resyncPeriod to 0 to disable resyncing. -// UpdateFunc event handler will be called only when the object is actually updated. -const resyncPeriod = 0 * time.Minute - // InitializeNPLAgent initializes the NodePortLocal agent. // It sets up event handlers to handle Pod add, update and delete events. // When a Pod gets created, a free Node port is obtained from the port table cache and a DNAT rule is added to NAT traffic to the Pod's ip:port. @@ -49,20 +44,6 @@ func InitializeNPLAgent( return nil, fmt.Errorf("error when initializing NodePortLocal port table: %v", err) } - return InitController(kubeClient, informerFactory, portTable, nodeName, podInformer) -} - -// InitController initializes the NPLController with appropriate Pod and Service Informers. -// This function can be used independently while unit testing without using InitializeNPLAgent function. -func InitController(kubeClient clientset.Interface, informerFactory informers.SharedInformerFactory, portTable *portcache.PortTable, nodeName string, podInformer cache.SharedIndexInformer) (*nplk8s.NPLController, error) { svcInformer := informerFactory.Core().V1().Services().Informer() - - c := nplk8s.NewNPLController(kubeClient, - podInformer, - svcInformer, - resyncPeriod, - portTable, - nodeName) - - return c, nil + return nplk8s.NewNPLController(kubeClient, podInformer, svcInformer, portTable, nodeName), nil } diff --git a/pkg/agent/nodeportlocal/npl_agent_test.go b/pkg/agent/nodeportlocal/npl_agent_test.go index 8d71e807b6a..c577e7e99b0 100644 --- a/pkg/agent/nodeportlocal/npl_agent_test.go +++ b/pkg/agent/nodeportlocal/npl_agent_test.go @@ -238,6 +238,7 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData { portTable: newPortTable(mockIPTables, mockPortOpener), } + resyncPeriod := 0 * time.Minute // informerFactory is initialized and started from cmd/antrea-agent/agent.go informerFactory := informers.NewSharedInformerFactory(data.k8sClient, resyncPeriod) listOptions := func(options *metav1.ListOptions) { @@ -250,9 +251,9 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData { cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, // NamespaceIndex is used in NPLController. listOptions, ) + svcInformer := informerFactory.Core().V1().Services().Informer() - c, err := InitController(data.k8sClient, informerFactory, data.portTable, defaultNodeName, localPodInformer) - require.NoError(t, err) + c := nplk8s.NewNPLController(data.k8sClient, localPodInformer, svcInformer, data.portTable, defaultNodeName) data.runWrapper(c) informerFactory.Start(data.stopCh) diff --git a/pkg/agent/secondarynetwork/podwatch/controller.go b/pkg/agent/secondarynetwork/podwatch/controller.go index bbf4746b655..f9244187673 100644 --- a/pkg/agent/secondarynetwork/podwatch/controller.go +++ b/pkg/agent/secondarynetwork/podwatch/controller.go @@ -26,11 +26,8 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers" - coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -81,7 +78,6 @@ type PodController struct { func NewPodController(kubeClient clientset.Interface, netAttachDefClient netdefclient.K8sCniCncfIoV1Interface, podInformer cache.SharedIndexInformer, - resyncPeriod time.Duration, nodeName string, podCache cnipodcache.CNIPodInfoStore, cniServer *cniserver.CNIServer) *PodController { @@ -398,26 +394,3 @@ func parsePodSecondaryNetworkAnnotation(netObj string) ([]*SecondaryNetworkObjec } return secNetwork, nil } - -func InitializePodController(kubeClient clientset.Interface, netAttachDefClient netdefclient.K8sCniCncfIoV1Interface, informerFactory informers.SharedInformerFactory, - nodeName string, podInfoStore cnipodcache.CNIPodInfoStore, cniServer *cniserver.CNIServer) (*PodController, error) { - // Watch only the Pods which belong to the Node where the agent is running. - listOptions := func(options *metav1.ListOptions) { - options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String() - } - podInformer := coreinformers.NewFilteredPodInformer( - kubeClient, - metav1.NamespaceAll, - resyncPeriod, - cache.Indexers{}, - listOptions, - ) - c := NewPodController(kubeClient, - netAttachDefClient, - podInformer, - resyncPeriod, - nodeName, - podInfoStore, - cniServer) - return c, nil -}