diff --git a/pkg/agent/nodeportlocal/k8s/npl_controller.go b/pkg/agent/nodeportlocal/k8s/npl_controller.go index 12bba2856d7..6fa0b07a884 100644 --- a/pkg/agent/nodeportlocal/k8s/npl_controller.go +++ b/pkg/agent/nodeportlocal/k8s/npl_controller.go @@ -50,16 +50,15 @@ const ( ) type NPLController struct { - portTable *portcache.PortTable - kubeClient clientset.Interface - queue workqueue.RateLimitingInterface - podInformer cache.SharedIndexInformer - podLister corelisters.PodLister - svcInformer cache.SharedIndexInformer - podToIP map[string]string - nodeName string - podIPLock sync.RWMutex - rulesInitialized chan struct{} + portTable *portcache.PortTable + kubeClient clientset.Interface + queue workqueue.RateLimitingInterface + podInformer cache.SharedIndexInformer + podLister corelisters.PodLister + svcInformer cache.SharedIndexInformer + podToIP map[string]string + nodeName string + podIPLock sync.RWMutex } func NewNPLController(kubeClient clientset.Interface, @@ -69,14 +68,13 @@ func NewNPLController(kubeClient clientset.Interface, pt *portcache.PortTable, nodeName string) *NPLController { c := NPLController{ - kubeClient: kubeClient, - portTable: pt, - podInformer: podInformer, - podLister: corelisters.NewPodLister(podInformer.GetIndexer()), - svcInformer: svcInformer, - podToIP: make(map[string]string), - nodeName: nodeName, - rulesInitialized: make(chan struct{}), + kubeClient: kubeClient, + portTable: pt, + podInformer: podInformer, + podLister: corelisters.NewPodLister(podInformer.GetIndexer()), + svcInformer: svcInformer, + podToIP: make(map[string]string), + nodeName: nodeName, } podInformer.AddEventHandlerWithResyncPeriod( @@ -119,14 +117,6 @@ func podKeyFunc(pod *corev1.Pod) string { return pod.Namespace + "/" + pod.Name } -func (c *NPLController) Initialize() error { - klog.InfoS("Will fetch Pods and generate NodePortLocal rules for these Pods") - if err := c.GetPodsAndGenRules(); err != nil { - return fmt.Errorf("error when getting Pods and generating rules: %v", err) - } - return nil -} - // Run starts to watch and process Pod updates for the Node where Antrea Agent is running. // It starts a queue and a fixed number of workers to process the objects from the queue. func (c *NPLController) Run(stopCh <-chan struct{}) { @@ -140,9 +130,8 @@ func (c *NPLController) Run(stopCh <-chan struct{}) { if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.podInformer.HasSynced, c.svcInformer.HasSynced) { return } - klog.InfoS("Waiting for initialization of NodePortLocal rules to complete") - <-c.rulesInitialized - klog.InfoS("Initialization of NodePortLocal rules successful") + + c.waitForRulesInitialization() for i := 0; i < numWorkers; i++ { go wait.Until(c.Worker, time.Second, stopCh) @@ -558,18 +547,27 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { return nil } -// GetPodsAndGenRules fetches all the Pods on this Node and looks for valid NodePortLocal +// waitForRulesInitialization fetches all the Pods on this Node and looks for valid NodePortLocal // annotations. If they exist, with a valid Node port, it adds the Node port to the port table and // rules. If the NodePortLocal annotation is invalid (cannot be unmarshalled), the annotation is // cleared. If the Node port is invalid (maybe the port range was changed and the Agent was // restarted), the annotation is ignored and will be removed by the Pod event handlers. The Pod // event handlers will also take care of allocating a new Node port if required. -func (c *NPLController) GetPodsAndGenRules() error { +// The function is meant to be called during Controller initialization, after the caches have +// synced. It will block until iptables rules have been synced successfully based on the listed +// Pods. After it returns, the Controller should start handling events. In case of an unexpected +// error, the function can return early or may not complete initialization. The Controller's event +// handlers are able to recover from these errors. +func (c *NPLController) waitForRulesInitialization() { + klog.InfoS("Will fetch Pods and generate NodePortLocal rules for these Pods") + podList, err := c.podLister.List(labels.Everything()) if err != nil { - return fmt.Errorf("error in fetching the Pods for Node %s: %v", c.nodeName, err) + klog.ErrorS(err, "Error when listing Pods for Node") } + // in case of an error when listing Pods above, allNPLPorts will be + // empty and all NPL iptables rules will be deleted. allNPLPorts := []rules.PodNodePort{} for i := range podList { // For each Pod: @@ -583,12 +581,11 @@ func (c *NPLController) GetPodsAndGenRules() error { continue } nplData := []NPLAnnotation{} - err := json.Unmarshal([]byte(nplAnnotation), &nplData) - if err != nil { + if err := json.Unmarshal([]byte(nplAnnotation), &nplData); err != nil { + klog.InfoS("Found invalid NodePortLocal annotation for Pod that cannot be parsed, cleaning it up", "pod", klog.KObj(pod)) // if there's an error in this NodePortLocal annotation, clean it up - err := c.cleanupNPLAnnotationForPod(pod) - if err != nil { - return err + if err := c.cleanupNPLAnnotationForPod(pod); err != nil { + klog.ErrorS(err, "Error when cleaning up NodePortLocal annotation for Pod", "pod", klog.KObj(pod)) } continue } @@ -597,27 +594,30 @@ func (c *NPLController) GetPodsAndGenRules() error { if npl.NodePort > c.portTable.EndPort || npl.NodePort < c.portTable.StartPort { // ignoring annotation for now, it will be removed by the first call // to handleAddUpdatePod - klog.V(2).Infof("Found invalid NodePortLocal annotation for Pod %s/%s: %s, ignoring it", pod.Namespace, pod.Name, nplAnnotation) + klog.V(2).InfoS("Found NodePortLocal annotation for which the allocated port doesn't fall into the configured range", "pod", klog.KObj(pod)) continue - } else { - allNPLPorts = append(allNPLPorts, rules.PodNodePort{ - NodePort: npl.NodePort, - PodPort: npl.PodPort, - PodIP: pod.Status.PodIP, - }) } + allNPLPorts = append(allNPLPorts, rules.PodNodePort{ + NodePort: npl.NodePort, + PodPort: npl.PodPort, + PodIP: pod.Status.PodIP, + }) } } - if err := c.addRulesForNPLPorts(allNPLPorts); err != nil { - return err + rulesInitialized := make(chan struct{}) + if err := c.addRulesForNPLPorts(allNPLPorts, rulesInitialized); err != nil { + klog.ErrorS(err, "Cannot install NodePortLocal rules") + return } - return nil + klog.InfoS("Waiting for initialization of NodePortLocal rules to complete") + <-rulesInitialized + klog.InfoS("Initialization of NodePortLocal rules successful") } -func (c *NPLController) addRulesForNPLPorts(allNPLPorts []rules.PodNodePort) error { - return c.portTable.RestoreRules(allNPLPorts, c.rulesInitialized) +func (c *NPLController) addRulesForNPLPorts(allNPLPorts []rules.PodNodePort, synced chan<- struct{}) error { + return c.portTable.RestoreRules(allNPLPorts, synced) } // cleanupNPLAnnotationForPod removes the NodePortLocal annotation from the Pod's annotations map entirely. diff --git a/pkg/agent/nodeportlocal/npl_agent_init.go b/pkg/agent/nodeportlocal/npl_agent_init.go index 1fe3d3d23eb..fd4bb230e87 100644 --- a/pkg/agent/nodeportlocal/npl_agent_init.go +++ b/pkg/agent/nodeportlocal/npl_agent_init.go @@ -76,9 +76,5 @@ func InitController(kubeClient clientset.Interface, informerFactory informers.Sh portTable, nodeName) - if err := c.Initialize(); err != nil { - return nil, fmt.Errorf("error when initializing NodePortLocal Controller: %v", err) - } - return c, nil } diff --git a/pkg/agent/nodeportlocal/portcache/port_table.go b/pkg/agent/nodeportlocal/portcache/port_table.go index 42439379e88..9248f60f503 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table.go +++ b/pkg/agent/nodeportlocal/portcache/port_table.go @@ -197,7 +197,7 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- pt.Table[nplPort.NodePort] = data } // retry mechanism as iptables-restore can fail if other components (in Antrea or other - // software) or accessing iptables. + // software) are accessing iptables. go func() { defer close(synced) var backoffTime = 2 * time.Second