diff --git a/pkg/agent/nodeportlocal/k8s/npl_controller.go b/pkg/agent/nodeportlocal/k8s/npl_controller.go index a345452c391..12bba2856d7 100644 --- a/pkg/agent/nodeportlocal/k8s/npl_controller.go +++ b/pkg/agent/nodeportlocal/k8s/npl_controller.go @@ -50,15 +50,16 @@ const ( ) type NPLController struct { - portTable *portcache.PortTable - kubeClient clientset.Interface - queue workqueue.RateLimitingInterface - podInformer cache.SharedIndexInformer - podLister corelisters.PodLister - svcInformer cache.SharedIndexInformer - podToIP map[string]string - nodeName string - podIPLock sync.RWMutex + portTable *portcache.PortTable + kubeClient clientset.Interface + queue workqueue.RateLimitingInterface + podInformer cache.SharedIndexInformer + podLister corelisters.PodLister + svcInformer cache.SharedIndexInformer + podToIP map[string]string + nodeName string + podIPLock sync.RWMutex + rulesInitialized chan struct{} } func NewNPLController(kubeClient clientset.Interface, @@ -68,13 +69,14 @@ func NewNPLController(kubeClient clientset.Interface, pt *portcache.PortTable, nodeName string) *NPLController { c := NPLController{ - kubeClient: kubeClient, - portTable: pt, - podInformer: podInformer, - podLister: corelisters.NewPodLister(podInformer.GetIndexer()), - svcInformer: svcInformer, - podToIP: make(map[string]string), - nodeName: nodeName, + kubeClient: kubeClient, + portTable: pt, + podInformer: podInformer, + podLister: corelisters.NewPodLister(podInformer.GetIndexer()), + svcInformer: svcInformer, + podToIP: make(map[string]string), + nodeName: nodeName, + rulesInitialized: make(chan struct{}), } podInformer.AddEventHandlerWithResyncPeriod( @@ -117,6 +119,14 @@ func podKeyFunc(pod *corev1.Pod) string { return pod.Namespace + "/" + pod.Name } +func (c *NPLController) Initialize() error { + klog.InfoS("Will fetch Pods and generate NodePortLocal rules for these Pods") + if err := c.GetPodsAndGenRules(); err != nil { + return fmt.Errorf("error when getting Pods and generating rules: %v", err) + } + return nil +} + // Run starts to watch and process Pod updates for the Node where Antrea Agent is running. // It starts a queue and a fixed number of workers to process the objects from the queue. func (c *NPLController) Run(stopCh <-chan struct{}) { @@ -130,12 +140,9 @@ func (c *NPLController) Run(stopCh <-chan struct{}) { if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.podInformer.HasSynced, c.svcInformer.HasSynced) { return } - klog.Info("Will fetch Pods and generate NodePortLocal rules for these Pods") - - if err := c.GetPodsAndGenRules(); err != nil { - klog.Errorf("Error in getting Pods and generating rules: %v", err) - return - } + klog.InfoS("Waiting for initialization of NodePortLocal rules to complete") + <-c.rulesInitialized + klog.InfoS("Initialization of NodePortLocal rules successful") for i := 0; i < numWorkers; i++ { go wait.Until(c.Worker, time.Second, stopCh) @@ -610,7 +617,7 @@ func (c *NPLController) GetPodsAndGenRules() error { } func (c *NPLController) addRulesForNPLPorts(allNPLPorts []rules.PodNodePort) error { - return c.portTable.SyncRules(allNPLPorts) + return c.portTable.RestoreRules(allNPLPorts, c.rulesInitialized) } // cleanupNPLAnnotationForPod removes the NodePortLocal annotation from the Pod's annotations map entirely. diff --git a/pkg/agent/nodeportlocal/npl_agent_init.go b/pkg/agent/nodeportlocal/npl_agent_init.go index fd4bb230e87..1fe3d3d23eb 100644 --- a/pkg/agent/nodeportlocal/npl_agent_init.go +++ b/pkg/agent/nodeportlocal/npl_agent_init.go @@ -76,5 +76,9 @@ func InitController(kubeClient clientset.Interface, informerFactory informers.Sh portTable, nodeName) + if err := c.Initialize(); err != nil { + return nil, fmt.Errorf("error when initializing NodePortLocal Controller: %v", err) + } + return c, nil } diff --git a/pkg/agent/nodeportlocal/npl_agent_test.go b/pkg/agent/nodeportlocal/npl_agent_test.go index 40c8c7abf30..a0f91e830af 100644 --- a/pkg/agent/nodeportlocal/npl_agent_test.go +++ b/pkg/agent/nodeportlocal/npl_agent_test.go @@ -154,12 +154,11 @@ func getTestSvcWithPortName(portName string) *corev1.Service { type testData struct { *testing.T - stopCh chan struct{} - ctrl *gomock.Controller - k8sClient *k8sfake.Clientset - portTable *portcache.PortTable - mockPortOpener *portcachetesting.MockLocalPortOpener - wg sync.WaitGroup + stopCh chan struct{} + ctrl *gomock.Controller + k8sClient *k8sfake.Clientset + portTable *portcache.PortTable + wg sync.WaitGroup } func (data *testData) runWrapper(c *nplk8s.NPLController) { @@ -170,18 +169,25 @@ func (data *testData) runWrapper(c *nplk8s.NPLController) { }() } +type customizePortOpenerExpectations func(*portcachetesting.MockLocalPortOpener) +type customizePodPortRulesExpectations func(*rulestesting.MockPodPortRules) + type testConfig struct { - defaultPortOpenerExpectations bool + customPortOpenerExpectations customizePortOpenerExpectations + customPodPortRulesExpectations customizePodPortRulesExpectations } func newTestConfig() *testConfig { - return &testConfig{ - defaultPortOpenerExpectations: true, - } + return &testConfig{} +} + +func (tc *testConfig) withCustomPortOpenerExpectations(fn customizePortOpenerExpectations) *testConfig { + tc.customPortOpenerExpectations = fn + return tc } -func (tc *testConfig) withDefaultPortOpenerExpectations(v bool) *testConfig { - tc.defaultPortOpenerExpectations = false +func (tc *testConfig) withCustomPodPortRulesExpectations(fn customizePodPortRulesExpectations) *testConfig { + tc.customPodPortRulesExpectations = fn return tc } @@ -191,22 +197,27 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData { mockCtrl := gomock.NewController(t) mockIPTables := rulestesting.NewMockPodPortRules(mockCtrl) - mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes() + if tc.customPodPortRulesExpectations != nil { + tc.customPodPortRulesExpectations(mockIPTables) + } else { + mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes() + } mockPortOpener := portcachetesting.NewMockLocalPortOpener(mockCtrl) - if tc.defaultPortOpenerExpectations { + if tc.customPortOpenerExpectations != nil { + tc.customPortOpenerExpectations(mockPortOpener) + } else { mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).AnyTimes().Return(&fakeSocket{}, nil) } data := &testData{ - T: t, - stopCh: make(chan struct{}), - ctrl: mockCtrl, - k8sClient: k8sfake.NewSimpleClientset(objects...), - portTable: newPortTable(mockIPTables, mockPortOpener), - mockPortOpener: mockPortOpener, + T: t, + stopCh: make(chan struct{}), + ctrl: mockCtrl, + k8sClient: k8sfake.NewSimpleClientset(objects...), + portTable: newPortTable(mockIPTables, mockPortOpener), } // informerFactory is initialized and started from cmd/antrea-agent/agent.go @@ -659,30 +670,35 @@ var ( // TestNodePortAlreadyBoundTo validates that when a port is already bound to, a different port will // be selected for NPL. func TestNodePortAlreadyBoundTo(t *testing.T) { - testSvc := getTestSvc() - testPod := getTestPod() - testConfig := newTestConfig().withDefaultPortOpenerExpectations(false) - testData := setUp(t, testConfig) - defer testData.tearDown() - var nodePort int - gomock.InOrder( - testData.mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).Return(nil, portTakenError), - testData.mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).DoAndReturn(func(port int) (portcache.Closeable, error) { - nodePort = port - return &fakeSocket{}, nil - }), - ) - - _, err := testData.k8sClient.CoreV1().Services(defaultNS).Create(context.TODO(), testSvc, metav1.CreateOptions{}) - require.NoError(t, err, "Service creation failed") - - _, err = testData.k8sClient.CoreV1().Pods(defaultNS).Create(context.TODO(), testPod, metav1.CreateOptions{}) - require.NoError(t, err, "Pod creation failed") + testConfig := newTestConfig().withCustomPortOpenerExpectations(func(mockPortOpener *portcachetesting.MockLocalPortOpener) { + gomock.InOrder( + mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).Return(nil, portTakenError), + mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).DoAndReturn(func(port int) (portcache.Closeable, error) { + nodePort = port + return &fakeSocket{}, nil + }), + ) + }) + testData, _, testPod := setUpWithTestServiceAndPod(t, testConfig) + defer testData.tearDown() value, err := testData.pollForPodAnnotation(testPod.Name, true) require.NoError(t, err, "Poll for annotation check failed") annotation := testData.checkAnnotationValue(value, defaultPort)[0] // length of slice is guaranteed to be correct at this stage assert.Equal(t, nodePort, annotation.NodePort) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort)) +} + +func TestSyncRulesError(t *testing.T) { + testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) { + mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + gomock.InOrder( + mockIPTables.EXPECT().AddAllRules(gomock.Any()).Return(fmt.Errorf("iptables failure")), + mockIPTables.EXPECT().AddAllRules(gomock.Any()).Return(nil).AnyTimes(), + ) + }) + + testData, _, _ := setUpWithTestServiceAndPod(t, testConfig) + defer testData.tearDown() } diff --git a/pkg/agent/nodeportlocal/portcache/port_table.go b/pkg/agent/nodeportlocal/portcache/port_table.go index 19a77691361..42439379e88 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table.go +++ b/pkg/agent/nodeportlocal/portcache/port_table.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "sync" + "time" "k8s.io/klog/v2" @@ -156,8 +157,26 @@ func (pt *PortTable) RuleExists(podIP string, podPort int) bool { return false } -func (pt *PortTable) SyncRules(allNPLPorts []rules.PodNodePort) error { - validNPLPorts := make([]rules.PodNodePort, 0, len(allNPLPorts)) +// syncRules ensures that contents of the port table matches the iptables rules present on the Node. +func (pt *PortTable) syncRules() error { + pt.tableLock.Lock() + defer pt.tableLock.Unlock() + nplPorts := make([]rules.PodNodePort, 0, len(pt.Table)) + for _, data := range pt.Table { + nplPorts = append(nplPorts, rules.PodNodePort{ + NodePort: data.NodePort, + PodPort: data.PodPort, + PodIP: data.PodIP, + }) + } + return pt.PodPortRules.AddAllRules(nplPorts) +} + +// RestoreRules should be called on startup to restore a set of NPL rules. It is non-blocking but +// takes as a parameter a channel, synced, which will be closed when the necessary rules have been +// restored successfully. No other operations should be performed on the PortTable until the channel +// is closed. +func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- struct{}) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() for _, nplPort := range allNPLPorts { @@ -176,9 +195,22 @@ func (pt *PortTable) SyncRules(allNPLPorts []rules.PodNodePort) error { socket: socket, } pt.Table[nplPort.NodePort] = data - validNPLPorts = append(validNPLPorts, nplPort) } - return pt.PodPortRules.AddAllRules(validNPLPorts) + // retry mechanism as iptables-restore can fail if other components (in Antrea or other + // software) or accessing iptables. + go func() { + defer close(synced) + var backoffTime = 2 * time.Second + for { + if err := pt.syncRules(); err != nil { + klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime) + time.Sleep(backoffTime) + continue + } + break + } + }() + return nil } // openLocalPort binds to the provided port.