Skip to content

Commit

Permalink
Wait for cache synced before initializing rules
Browse files Browse the repository at this point in the history
Signed-off-by: Antonin Bas <abas@vmware.com>
  • Loading branch information
antoninbas committed Aug 10, 2021
1 parent 27bbd03 commit d366acc
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 54 deletions.
98 changes: 49 additions & 49 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,15 @@ const (
)

type NPLController struct {
portTable *portcache.PortTable
kubeClient clientset.Interface
queue workqueue.RateLimitingInterface
podInformer cache.SharedIndexInformer
podLister corelisters.PodLister
svcInformer cache.SharedIndexInformer
podToIP map[string]string
nodeName string
podIPLock sync.RWMutex
rulesInitialized chan struct{}
portTable *portcache.PortTable
kubeClient clientset.Interface
queue workqueue.RateLimitingInterface
podInformer cache.SharedIndexInformer
podLister corelisters.PodLister
svcInformer cache.SharedIndexInformer
podToIP map[string]string
nodeName string
podIPLock sync.RWMutex
}

func NewNPLController(kubeClient clientset.Interface,
Expand All @@ -69,14 +68,13 @@ func NewNPLController(kubeClient clientset.Interface,
pt *portcache.PortTable,
nodeName string) *NPLController {
c := NPLController{
kubeClient: kubeClient,
portTable: pt,
podInformer: podInformer,
podLister: corelisters.NewPodLister(podInformer.GetIndexer()),
svcInformer: svcInformer,
podToIP: make(map[string]string),
nodeName: nodeName,
rulesInitialized: make(chan struct{}),
kubeClient: kubeClient,
portTable: pt,
podInformer: podInformer,
podLister: corelisters.NewPodLister(podInformer.GetIndexer()),
svcInformer: svcInformer,
podToIP: make(map[string]string),
nodeName: nodeName,
}

podInformer.AddEventHandlerWithResyncPeriod(
Expand Down Expand Up @@ -119,14 +117,6 @@ func podKeyFunc(pod *corev1.Pod) string {
return pod.Namespace + "/" + pod.Name
}

func (c *NPLController) Initialize() error {
klog.InfoS("Will fetch Pods and generate NodePortLocal rules for these Pods")
if err := c.GetPodsAndGenRules(); err != nil {
return fmt.Errorf("error when getting Pods and generating rules: %v", err)
}
return nil
}

// Run starts to watch and process Pod updates for the Node where Antrea Agent is running.
// It starts a queue and a fixed number of workers to process the objects from the queue.
func (c *NPLController) Run(stopCh <-chan struct{}) {
Expand All @@ -140,9 +130,8 @@ func (c *NPLController) Run(stopCh <-chan struct{}) {
if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.podInformer.HasSynced, c.svcInformer.HasSynced) {
return
}
klog.InfoS("Waiting for initialization of NodePortLocal rules to complete")
<-c.rulesInitialized
klog.InfoS("Initialization of NodePortLocal rules successful")

c.waitForRulesInitialization()

for i := 0; i < numWorkers; i++ {
go wait.Until(c.Worker, time.Second, stopCh)
Expand Down Expand Up @@ -558,18 +547,27 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
return nil
}

// GetPodsAndGenRules fetches all the Pods on this Node and looks for valid NodePortLocal
// waitForRulesInitialization fetches all the Pods on this Node and looks for valid NodePortLocal
// annotations. If they exist, with a valid Node port, it adds the Node port to the port table and
// rules. If the NodePortLocal annotation is invalid (cannot be unmarshalled), the annotation is
// cleared. If the Node port is invalid (maybe the port range was changed and the Agent was
// restarted), the annotation is ignored and will be removed by the Pod event handlers. The Pod
// event handlers will also take care of allocating a new Node port if required.
func (c *NPLController) GetPodsAndGenRules() error {
// The function is meant to be called during Controller initialization, after the caches have
// synced. It will block until iptables rules have been synced successfully based on the listed
// Pods. After it returns, the Controller should start handling events. In case of an unexpected
// error, the function can return early or may not complete initialization. The Controller's event
// handlers are able to recover from these errors.
func (c *NPLController) waitForRulesInitialization() {
klog.InfoS("Will fetch Pods and generate NodePortLocal rules for these Pods")

podList, err := c.podLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("error in fetching the Pods for Node %s: %v", c.nodeName, err)
klog.ErrorS(err, "Error when listing Pods for Node")
}

// in case of an error when listing Pods above, allNPLPorts will be
// empty and all NPL iptables rules will be deleted.
allNPLPorts := []rules.PodNodePort{}
for i := range podList {
// For each Pod:
Expand All @@ -583,12 +581,11 @@ func (c *NPLController) GetPodsAndGenRules() error {
continue
}
nplData := []NPLAnnotation{}
err := json.Unmarshal([]byte(nplAnnotation), &nplData)
if err != nil {
if err := json.Unmarshal([]byte(nplAnnotation), &nplData); err != nil {
klog.InfoS("Found invalid NodePortLocal annotation for Pod that cannot be parsed, cleaning it up", "pod", klog.KObj(pod))
// if there's an error in this NodePortLocal annotation, clean it up
err := c.cleanupNPLAnnotationForPod(pod)
if err != nil {
return err
if err := c.cleanupNPLAnnotationForPod(pod); err != nil {
klog.ErrorS(err, "Error when cleaning up NodePortLocal annotation for Pod", "pod", klog.KObj(pod))
}
continue
}
Expand All @@ -597,27 +594,30 @@ func (c *NPLController) GetPodsAndGenRules() error {
if npl.NodePort > c.portTable.EndPort || npl.NodePort < c.portTable.StartPort {
// ignoring annotation for now, it will be removed by the first call
// to handleAddUpdatePod
klog.V(2).Infof("Found invalid NodePortLocal annotation for Pod %s/%s: %s, ignoring it", pod.Namespace, pod.Name, nplAnnotation)
klog.V(2).InfoS("Found NodePortLocal annotation for which the allocated port doesn't fall into the configured range", "pod", klog.KObj(pod))
continue
} else {
allNPLPorts = append(allNPLPorts, rules.PodNodePort{
NodePort: npl.NodePort,
PodPort: npl.PodPort,
PodIP: pod.Status.PodIP,
})
}
allNPLPorts = append(allNPLPorts, rules.PodNodePort{
NodePort: npl.NodePort,
PodPort: npl.PodPort,
PodIP: pod.Status.PodIP,
})
}
}

if err := c.addRulesForNPLPorts(allNPLPorts); err != nil {
return err
rulesInitialized := make(chan struct{})
if err := c.addRulesForNPLPorts(allNPLPorts, rulesInitialized); err != nil {
klog.ErrorS(err, "Cannot install NodePortLocal rules")
return
}

return nil
klog.InfoS("Waiting for initialization of NodePortLocal rules to complete")
<-rulesInitialized
klog.InfoS("Initialization of NodePortLocal rules successful")
}

func (c *NPLController) addRulesForNPLPorts(allNPLPorts []rules.PodNodePort) error {
return c.portTable.RestoreRules(allNPLPorts, c.rulesInitialized)
func (c *NPLController) addRulesForNPLPorts(allNPLPorts []rules.PodNodePort, synced chan<- struct{}) error {
return c.portTable.RestoreRules(allNPLPorts, synced)
}

// cleanupNPLAnnotationForPod removes the NodePortLocal annotation from the Pod's annotations map entirely.
Expand Down
4 changes: 0 additions & 4 deletions pkg/agent/nodeportlocal/npl_agent_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,5 @@ func InitController(kubeClient clientset.Interface, informerFactory informers.Sh
portTable,
nodeName)

if err := c.Initialize(); err != nil {
return nil, fmt.Errorf("error when initializing NodePortLocal Controller: %v", err)
}

return c, nil
}
2 changes: 1 addition & 1 deletion pkg/agent/nodeportlocal/portcache/port_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<-
pt.Table[nplPort.NodePort] = data
}
// retry mechanism as iptables-restore can fail if other components (in Antrea or other
// software) or accessing iptables.
// software) are accessing iptables.
go func() {
defer close(synced)
var backoffTime = 2 * time.Second
Expand Down

0 comments on commit d366acc

Please sign in to comment.