Skip to content

Commit

Permalink
Update NPL and Secondary Network controllers initialization code (ant…
Browse files Browse the repository at this point in the history
…rea-io#3257)

Change secondary network Pod controller to use the shared
localPodInfomer.

Signed-off-by: Jianjun Shen <shenj@vmware.com>
  • Loading branch information
jianjuns authored Feb 9, 2022
1 parent b14d5db commit 5ccd3d1
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 77 deletions.
54 changes: 28 additions & 26 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ func run(o *Options) error {

// Initialize localPodInformer for NPLAgent and AntreaIPAMController
var localPodInformer cache.SharedIndexInformer
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) && o.config.NodePortLocal.Enable || features.DefaultFeatureGate.Enabled(features.AntreaIPAM) {
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) && o.config.NodePortLocal.Enable ||
features.DefaultFeatureGate.Enabled(features.AntreaIPAM) ||
features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
listOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeConfig.Name).String()
}
Expand All @@ -463,6 +465,19 @@ func run(o *Options) error {
listOptions,
)
}

log.StartLogFileNumberMonitor(stopCh)

go routeClient.Run(stopCh)

go cniServer.Run(stopCh)

go antreaClientProvider.Run(stopCh)

go nodeRouteController.Run(stopCh)

go networkPolicyController.Run(stopCh)

// Initialize the NPL agent.
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) && o.config.NodePortLocal.Enable {
nplController, err := npl.InitializeNPLAgent(
Expand All @@ -477,6 +492,7 @@ func run(o *Options) error {
}
go nplController.Run(stopCh)
}

// Initialize the Antrea IPAM agent.
if features.DefaultFeatureGate.Enabled(features.AntreaIPAM) {
ipamController, err := ipam.InitializeAntreaIPAMController(
Expand All @@ -490,46 +506,32 @@ func run(o *Options) error {
}
go ipamController.Run(stopCh)
}
// Start the localPodInformer
if localPodInformer != nil {
go localPodInformer.Run(stopCh)
}

log.StartLogFileNumberMonitor(stopCh)

go routeClient.Run(stopCh)

go cniServer.Run(stopCh)

informerFactory.Start(stopCh)
crdInformerFactory.Start(stopCh)

go antreaClientProvider.Run(stopCh)

go nodeRouteController.Run(stopCh)

go networkPolicyController.Run(stopCh)

if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
// Create the NetworkAttachmentDefinition client, which handles access to secondary network object definition from the API Server.
netAttachDefClient, err := k8s.CreateNetworkAttachDefClient(o.config.ClientConnection, o.config.KubeAPIServerOverride)
if err != nil {
return fmt.Errorf("NetworkAttachmentDefinition client creation failed. %v", err)
}
// Initialize podController to handle secondary network configuration for Pods with k8s.v1.cni.cncf.io/networks Annotation defined.
podWatchController, err := podwatch.InitializePodController(
// Create podController to handle secondary network configuration for Pods with k8s.v1.cni.cncf.io/networks Annotation defined.
podWatchController := podwatch.NewPodController(
k8sClient,
netAttachDefClient,
informerFactory,
localPodInformer,
nodeConfig.Name,
cniPodInfoStore,
cniServer)
if err != nil {
return fmt.Errorf("failed to initialize Pod Controller for secondary network interface management: %v", err)
}
go podWatchController.Run(stopCh)
}

// Start the localPodInformer
if localPodInformer != nil {
go localPodInformer.Run(stopCh)
}

informerFactory.Start(stopCh)
crdInformerFactory.Start(stopCh)

if features.DefaultFeatureGate.Enabled(features.Egress) || features.DefaultFeatureGate.Enabled(features.ServiceExternalIP) {
go externalIPPoolController.Run(stopCh)
go localIPDetector.Run(stopCh)
Expand Down
7 changes: 5 additions & 2 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ import (
)

const (
controllerName = "AntreaAgentNPLController"
controllerName = "NPLController"
minRetryDelay = 2 * time.Second
maxRetryDelay = 120 * time.Second
numWorkers = 4

// Set resyncPeriod to 0 to disable resyncing.
// UpdateFunc event handler will be called only when the object is actually updated.
resyncPeriod = 0 * time.Minute
)

type NPLController struct {
Expand All @@ -63,7 +67,6 @@ type NPLController struct {
func NewNPLController(kubeClient clientset.Interface,
podInformer cache.SharedIndexInformer,
svcInformer cache.SharedIndexInformer,
resyncPeriod time.Duration,
pt *portcache.PortTable,
nodeName string) *NPLController {
c := NPLController{
Expand Down
21 changes: 1 addition & 20 deletions pkg/agent/nodeportlocal/npl_agent_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package nodeportlocal

import (
"fmt"
"time"

nplk8s "antrea.io/antrea/pkg/agent/nodeportlocal/k8s"
"antrea.io/antrea/pkg/agent/nodeportlocal/portcache"
Expand All @@ -29,10 +28,6 @@ import (
"k8s.io/client-go/tools/cache"
)

// Set resyncPeriod to 0 to disable resyncing.
// UpdateFunc event handler will be called only when the object is actually updated.
const resyncPeriod = 0 * time.Minute

// InitializeNPLAgent initializes the NodePortLocal agent.
// It sets up event handlers to handle Pod add, update and delete events.
// When a Pod gets created, a free Node port is obtained from the port table cache and a DNAT rule is added to NAT traffic to the Pod's ip:port.
Expand All @@ -49,20 +44,6 @@ func InitializeNPLAgent(
return nil, fmt.Errorf("error when initializing NodePortLocal port table: %v", err)
}

return InitController(kubeClient, informerFactory, portTable, nodeName, podInformer)
}

// InitController initializes the NPLController with appropriate Pod and Service Informers.
// This function can be used independently while unit testing without using InitializeNPLAgent function.
func InitController(kubeClient clientset.Interface, informerFactory informers.SharedInformerFactory, portTable *portcache.PortTable, nodeName string, podInformer cache.SharedIndexInformer) (*nplk8s.NPLController, error) {
svcInformer := informerFactory.Core().V1().Services().Informer()

c := nplk8s.NewNPLController(kubeClient,
podInformer,
svcInformer,
resyncPeriod,
portTable,
nodeName)

return c, nil
return nplk8s.NewNPLController(kubeClient, podInformer, svcInformer, portTable, nodeName), nil
}
5 changes: 3 additions & 2 deletions pkg/agent/nodeportlocal/npl_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData {
portTable: newPortTable(mockIPTables, mockPortOpener),
}

resyncPeriod := 0 * time.Minute
// informerFactory is initialized and started from cmd/antrea-agent/agent.go
informerFactory := informers.NewSharedInformerFactory(data.k8sClient, resyncPeriod)
listOptions := func(options *metav1.ListOptions) {
Expand All @@ -250,9 +251,9 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData {
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, // NamespaceIndex is used in NPLController.
listOptions,
)
svcInformer := informerFactory.Core().V1().Services().Informer()

c, err := InitController(data.k8sClient, informerFactory, data.portTable, defaultNodeName, localPodInformer)
require.NoError(t, err)
c := nplk8s.NewNPLController(data.k8sClient, localPodInformer, svcInformer, data.portTable, defaultNodeName)

data.runWrapper(c)
informerFactory.Start(data.stopCh)
Expand Down
27 changes: 0 additions & 27 deletions pkg/agent/secondarynetwork/podwatch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -81,7 +78,6 @@ type PodController struct {
func NewPodController(kubeClient clientset.Interface,
netAttachDefClient netdefclient.K8sCniCncfIoV1Interface,
podInformer cache.SharedIndexInformer,
resyncPeriod time.Duration,
nodeName string,
podCache cnipodcache.CNIPodInfoStore,
cniServer *cniserver.CNIServer) *PodController {
Expand Down Expand Up @@ -398,26 +394,3 @@ func parsePodSecondaryNetworkAnnotation(netObj string) ([]*SecondaryNetworkObjec
}
return secNetwork, nil
}

func InitializePodController(kubeClient clientset.Interface, netAttachDefClient netdefclient.K8sCniCncfIoV1Interface, informerFactory informers.SharedInformerFactory,
nodeName string, podInfoStore cnipodcache.CNIPodInfoStore, cniServer *cniserver.CNIServer) (*PodController, error) {
// Watch only the Pods which belong to the Node where the agent is running.
listOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String()
}
podInformer := coreinformers.NewFilteredPodInformer(
kubeClient,
metav1.NamespaceAll,
resyncPeriod,
cache.Indexers{},
listOptions,
)
c := NewPodController(kubeClient,
netAttachDefClient,
podInformer,
resyncPeriod,
nodeName,
podInfoStore,
cniServer)
return c, nil
}

0 comments on commit 5ccd3d1

Please sign in to comment.