From 774f67196e09a9d4e27df3fdc1ffdd911a91e414 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Wed, 8 May 2024 10:35:24 -0700 Subject: [PATCH] Fix single rule deletion for NodePortLocal on Linux (#6284) (#6297) The logic for deleting an individual NPL mapping was broken. It incorrectly believed that the protocol socket was still in use, and the mapping could never be deleted, putting the NPL controller in an endless error loop. The State field in ProtocolSocketData was left over from pre Antrea v1.7, back when we would always use the same port number for multiple protocols, for a give Pod IP + port. With the current version of the NPL implementation, this field is not needed and should be removed. By removing the field, we avoid the deletion issue. This patch also ensures that if a rule is only partially cleaned-up, we can attempt to delete it again, by making DeleteRule idempotent. To identify that a prior deletion attempt failed, we introduce a "defunct" field in the NPL rule data. If this field is set, the controller knows that the rule has been partially deleted and deletion needs to be attempted again. Without this, it would be possible for the controller (with the right sequence of updates) to assume that a partially-deleted rule is still valid, which would break the datapath. I plan on improving the NPL code further with a follow-up patch, but in order to keep this patch small (for back-porting), I went with the simplest solution I could think of. Fixes #6281 Signed-off-by: Antonin Bas --- pkg/agent/nodeportlocal/k8s/npl_controller.go | 22 ++- pkg/agent/nodeportlocal/npl_agent_test.go | 183 +++++++++++++++--- .../nodeportlocal/portcache/port_table.go | 26 +-- .../portcache/port_table_others.go | 90 +++------ .../portcache/port_table_others_test.go | 56 ++++++ .../portcache/port_table_windows.go | 8 +- test/e2e/nodeportlocal_test.go | 39 ++-- 7 files changed, 287 insertions(+), 137 deletions(-) diff --git a/pkg/agent/nodeportlocal/k8s/npl_controller.go b/pkg/agent/nodeportlocal/k8s/npl_controller.go index 592098e7aa0..9469af9ade6 100644 --- a/pkg/agent/nodeportlocal/k8s/npl_controller.go +++ b/pkg/agent/nodeportlocal/k8s/npl_controller.go @@ -493,9 +493,13 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { } podPorts[targetPortProto] = struct{}{} portData := c.portTable.GetEntry(podIP, port, protocol) - if portData != nil && !portData.ProtocolInUse(protocol) { - // If the PortTable has an entry for the Pod but does not have an - // entry with protocol, we enforce AddRule for the missing Protocol. + // Special handling for a rule that was previously marked for deletion but could not + // be deleted properly: we have to retry now. + if portData != nil && portData.Defunct() { + klog.InfoS("Deleting defunct rule for Pod to prevent re-use", "pod", klog.KObj(pod), "podIP", podIP, "port", port, "protocol", protocol) + if err := c.portTable.DeleteRule(podIP, port, protocol); err != nil { + return fmt.Errorf("failed to delete defunct rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, port, protocol, err) + } portData = nil } if portData == nil { @@ -526,13 +530,11 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { // second, delete any existing rule that is not needed based on the current Pod // specification. entries := c.portTable.GetDataForPodIP(podIP) - if nplExists { - for _, data := range entries { - proto := data.Protocol - if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists { - if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil { - return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %v", podIP, data.PodPort, proto.Protocol, err) - } + for _, data := range entries { + proto := data.Protocol + if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists { + if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil { + return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, data.PodPort, proto.Protocol, err) } } } diff --git a/pkg/agent/nodeportlocal/npl_agent_test.go b/pkg/agent/nodeportlocal/npl_agent_test.go index 53b33f61a01..bcdd90d9236 100644 --- a/pkg/agent/nodeportlocal/npl_agent_test.go +++ b/pkg/agent/nodeportlocal/npl_agent_test.go @@ -175,11 +175,12 @@ func getTestSvcWithPortName(portName string) *corev1.Service { type testData struct { *testing.T - stopCh chan struct{} - ctrl *gomock.Controller - k8sClient *k8sfake.Clientset - portTable *portcache.PortTable - wg sync.WaitGroup + stopCh chan struct{} + ctrl *gomock.Controller + k8sClient *k8sfake.Clientset + portTable *portcache.PortTable + svcInformer cache.SharedIndexInformer + wg sync.WaitGroup } func (t *testData) runWrapper(c *k8s.NPLController) { @@ -233,22 +234,18 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData { mockPortOpener.EXPECT().OpenLocalPort(gomock.Any(), gomock.Any()).AnyTimes().Return(&fakeSocket{}, nil) } - data := &testData{ - T: t, - stopCh: make(chan struct{}), - ctrl: mockCtrl, - k8sClient: k8sfake.NewSimpleClientset(objects...), - portTable: newPortTable(mockIPTables, mockPortOpener), - } + k8sClient := k8sfake.NewSimpleClientset(objects...) + + portTable := newPortTable(mockIPTables, mockPortOpener) resyncPeriod := 0 * time.Minute // informerFactory is initialized and started from cmd/antrea-agent/agent.go - informerFactory := informers.NewSharedInformerFactory(data.k8sClient, resyncPeriod) + informerFactory := informers.NewSharedInformerFactory(k8sClient, resyncPeriod) listOptions := func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", defaultNodeName).String() } localPodInformer := coreinformers.NewFilteredPodInformer( - data.k8sClient, + k8sClient, metav1.NamespaceAll, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, // NamespaceIndex is used in NPLController. @@ -256,7 +253,16 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData { ) svcInformer := informerFactory.Core().V1().Services().Informer() - c := k8s.NewNPLController(data.k8sClient, localPodInformer, svcInformer, data.portTable, defaultNodeName) + c := k8s.NewNPLController(k8sClient, localPodInformer, svcInformer, portTable, defaultNodeName) + + data := &testData{ + T: t, + stopCh: make(chan struct{}), + ctrl: mockCtrl, + k8sClient: k8sClient, + portTable: portTable, + svcInformer: svcInformer, + } data.runWrapper(c) informerFactory.Start(data.stopCh) @@ -303,31 +309,41 @@ func (t *testData) tearDown() { t.wg.Wait() } -func (t *testData) pollForPodAnnotation(podName string, found bool) ([]types.NPLAnnotation, error) { - var data string - var exists bool +func conditionMatchAll([]types.NPLAnnotation) bool { + return true +} + +// If conditionFn is nil, we will assume you are looking for a non-existing annotation. +// If you want to match all, use conditionMatchAll as the conditionFn. +func (t *testData) pollForPodAnnotationWithCondition(podName string, conditionFn func([]types.NPLAnnotation) bool) ([]types.NPLAnnotation, error) { + var nplValue []types.NPLAnnotation // do not use PollImmediate: 1 second is reserved for the controller to do his job and // update Pod NPL annotations as needed. err := wait.PollUntilContextTimeout(context.Background(), time.Second, 20*time.Second, false, func(ctx context.Context) (bool, error) { updatedPod, err := t.k8sClient.CoreV1().Pods(defaultNS).Get(context.TODO(), podName, metav1.GetOptions{}) require.NoError(t, err, "Failed to get Pod") annotation := updatedPod.GetAnnotations() - data, exists = annotation[types.NPLAnnotationKey] - if found { - return exists, nil + data, exists := annotation[types.NPLAnnotationKey] + if !exists { + return conditionFn == nil, nil } - return !exists, nil + if conditionFn == nil { + return false, nil + } + if err := json.Unmarshal([]byte(data), &nplValue); err != nil { + return false, err + } + return conditionFn(nplValue), nil }) + return nplValue, err +} - if err != nil { - return []types.NPLAnnotation{}, err - } - if data == "" { - return []types.NPLAnnotation{}, nil +func (t *testData) pollForPodAnnotation(podName string, found bool) ([]types.NPLAnnotation, error) { + var conditionFn func([]types.NPLAnnotation) bool + if found { + conditionFn = conditionMatchAll } - var nplValue []types.NPLAnnotation - err = json.Unmarshal([]byte(data), &nplValue) - return nplValue, err + return t.pollForPodAnnotationWithCondition(podName, conditionFn) } func (t *testData) updateServiceOrFail(testSvc *corev1.Service) { @@ -495,6 +511,7 @@ func TestPodDelete(t *testing.T) { // TestPodAddMultiPort creates a Pod and a Service with two target ports. // It verifies that the Pod's NPL annotation and the local port table are updated with both ports. +// It then updates the Service to remove one of the target ports. func TestAddMultiPortPodSvc(t *testing.T) { newPort := 90 testSvc := getTestSvc(defaultPort, int32(newPort)) @@ -508,6 +525,16 @@ func TestAddMultiPortPodSvc(t *testing.T) { expectedAnnotations.Check(t, value) assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + + // Remove the second target port. + testSvc.Spec.Ports = testSvc.Spec.Ports[:1] + testData.updateServiceOrFail(testSvc) + // Wait for annotation to be updated (single mapping). + value, err = testData.pollForPodAnnotationWithCondition(testPod.Name, func(value []types.NPLAnnotation) bool { return len(value) == 1 }) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) + expectedAnnotations.Check(t, value) + assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) } // TestPodAddMultiPort creates a Pod with multiple ports and a Service with only one target port. @@ -805,3 +832,99 @@ func TestSyncRulesError(t *testing.T) { testData, _, _ := setUpWithTestServiceAndPod(t, testConfig, nil) defer testData.tearDown() } + +func TestSingleRuleDeletionError(t *testing.T) { + newPort := 90 + testSvc := getTestSvc(defaultPort, int32(newPort)) + testPod := getTestPod() + + testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) { + mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes() + gomock.InOrder( + mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2), + mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("iptables failure")), + mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()), + ) + }) + + testData := setUp(t, testConfig, testSvc, testPod) + defer testData.tearDown() + + value, err := testData.pollForPodAnnotation(testPod.Name, true) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP) + expectedAnnotations.Check(t, value) + assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + + // Remove the second target port, to force one mapping to be deleted. + testSvc.Spec.Ports = testSvc.Spec.Ports[:1] + testData.updateServiceOrFail(testSvc) + // The first deletion attempt will fail, but the second should succeed. + // Wait for annotation to be updated (single mapping). + value, err = testData.pollForPodAnnotationWithCondition(testPod.Name, func(value []types.NPLAnnotation) bool { return len(value) == 1 }) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) + expectedAnnotations.Check(t, value) + assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) +} + +func TestPreventDefunctRuleReuse(t *testing.T) { + newPort := 90 + testSvc := getTestSvc(defaultPort, int32(newPort)) + testPod := getTestPod() + + var testData *testData + + ports := testSvc.Spec.Ports + // This function will be executed synchronously when DeleteRule is called for the first time + // and we simulate a failure. It restores the second target port for the Service, which was + // deleted previously, and waits for the change to be reflected in the informer's + // store. After that, we know that the next time the NPL controller processes the test Pod, + // it will need to ensure that both NPL mappings are configured correctly. Because one of + // the rules will be marked as "defunct", it will first need to delete the rule properly + // before adding it back. + restoreServiceTargetPorts := func() { + testSvc.Spec.Ports = ports + _, err := testData.k8sClient.CoreV1().Services(defaultNS).Update(context.TODO(), testSvc, metav1.UpdateOptions{}) + if !assert.NoError(t, err) { + return + } + assert.EventuallyWithT(t, func(c *assert.CollectT) { + obj, exists, err := testData.svcInformer.GetIndexer().GetByKey(testSvc.Namespace + "/" + testSvc.Name) + if !assert.NoError(t, err) || !assert.True(t, exists) { + return + } + svc := obj.(*corev1.Service) + assert.Len(t, svc.Spec.Ports, 2) + }, 2*time.Second, 50*time.Millisecond) + } + + testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) { + mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes() + gomock.InOrder( + mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2), + mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(nodePort int, podIP string, podPort int, protocol string) { restoreServiceTargetPorts() }, + ).Return(fmt.Errorf("iptables failure")), + mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()), + mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()), + ) + }) + + testData = setUp(t, testConfig, testSvc, testPod) + defer testData.tearDown() + + value, err := testData.pollForPodAnnotation(testPod.Name, true) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP) + expectedAnnotations.Check(t, value) + assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + + // Remove the second target port, to force one mapping to be deleted. + testSvc.Spec.Ports = testSvc.Spec.Ports[:1] + testData.updateServiceOrFail(testSvc) + + assert.Eventually(t, testData.ctrl.Satisfied, 2*time.Second, 50*time.Millisecond) +} diff --git a/pkg/agent/nodeportlocal/portcache/port_table.go b/pkg/agent/nodeportlocal/portcache/port_table.go index c2c99203eb1..b1856861bd9 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table.go +++ b/pkg/agent/nodeportlocal/portcache/port_table.go @@ -32,13 +32,8 @@ const ( PodIPIndex = "podIPIndex" ) -// protocolSocketState represents the state of the socket corresponding to a -// given (Node port, protocol) tuple. -type protocolSocketState int - type ProtocolSocketData struct { Protocol string - State protocolSocketState socket io.Closer } @@ -47,14 +42,13 @@ type NodePortData struct { PodPort int PodIP string Protocol ProtocolSocketData + // defunct is used to indicate that a rule has been partially deleted: it is no longer + // usable and deletion needs to be re-attempted. + defunct bool } -func (d *NodePortData) ProtocolInUse(protocol string) bool { - protocolSocketData := d.Protocol - if protocolSocketData.Protocol == protocol { - return protocolSocketData.State == stateInUse - } - return false +func (d *NodePortData) Defunct() bool { + return d.defunct } type LocalPortOpener interface { @@ -204,8 +198,8 @@ func podIPPortProtoFormat(ip string, port int, protocol string) string { } func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol string) *NodePortData { - data, exists := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol)) - if exists == false { + data, ok := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol)) + if !ok { return nil } return data @@ -214,10 +208,8 @@ func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol stri func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool { pt.tableLock.RLock() defer pt.tableLock.RUnlock() - if data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol); data != nil { - return data.ProtocolInUse(protocol) - } - return false + data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + return data != nil } // nodePortProtoFormat formats the nodeport, protocol to string port:protocol. diff --git a/pkg/agent/nodeportlocal/portcache/port_table_others.go b/pkg/agent/nodeportlocal/portcache/port_table_others.go index 30ec417e7e3..c13eaf1d7b4 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_others.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_others.go @@ -26,18 +26,6 @@ import ( "antrea.io/antrea/pkg/agent/nodeportlocal/rules" ) -const ( - // stateOpen means that a listening socket has been opened for the - // protocol (as a means to reserve the port for this protocol), but no - // NPL rule has been installed for it. - stateOpen protocolSocketState = iota - // stateInUse means that a listening socket has been opened AND a NPL - // rule has been installed. - stateInUse - // stateClosed means that the socket has been closed. - stateClosed -) - func openSocketsForPort(localPortOpener LocalPortOpener, port int, protocol string) (ProtocolSocketData, error) { // Port only needs to be available for the protocol used by the NPL rule. // We don't need to allocate the same nodePort for all protocols anymore. @@ -48,7 +36,6 @@ func openSocketsForPort(localPortOpener LocalPortOpener, port int, protocol stri } protocolData := ProtocolSocketData{ Protocol: protocol, - State: stateInUse, socket: socket, } return protocolData, nil @@ -83,28 +70,6 @@ func (pt *PortTable) getFreePort(podIP string, podPort int, protocol string) (in return 0, ProtocolSocketData{}, fmt.Errorf("no free port found") } -func (d *NodePortData) CloseSockets() error { - if d.Protocol.Protocol != "" { - protocolSocketData := &d.Protocol - switch protocolSocketData.State { - case stateClosed: - // already closed - return nil - case stateInUse: - // should not happen - return fmt.Errorf("protocol %s is still in use, cannot release socket", protocolSocketData.Protocol) - case stateOpen: - if err := protocolSocketData.socket.Close(); err != nil { - return fmt.Errorf("error when releasing local port %d with protocol %s: %v", d.NodePort, protocolSocketData.Protocol, err) - } - protocolSocketData.State = stateClosed - default: - return fmt.Errorf("invalid protocol socket state") - } - } - return nil -} - func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, error) { pt.tableLock.Lock() defer pt.tableLock.Unlock() @@ -128,11 +93,38 @@ func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, e pt.addPortTableCache(npData) } else { // Only add rules if the entry does not exist. - return 0, fmt.Errorf("existed Linux Nodeport entry for %s:%d:%s", podIP, podPort, protocol) + return 0, fmt.Errorf("existing Linux Nodeport entry for %s:%d:%s", podIP, podPort, protocol) } return npData.NodePort, nil } +func (pt *PortTable) deleteRule(data *NodePortData) error { + protocolSocketData := &data.Protocol + protocol := protocolSocketData.Protocol + + // In theory, we should not be modifying a cache item in-place. However, the field we are + // modifying (defunct) does NOT participate in indexing and the modification is thread-safe + // because of pt.tableLock. + // TODO: stop modifying cache items in-place. + // We could set defunct after the call to DeleteRule, because a failed call to DeleteRule + // should mean that the rule is still present and valid, but there is no harm in being more + // conservative. + data.defunct = true + + // Calling DeleteRule is idempotent. + if err := pt.PodPortRules.DeleteRule(data.NodePort, data.PodIP, data.PodPort, protocol); err != nil { + return err + } + if err := protocolSocketData.socket.Close(); err != nil { + return fmt.Errorf("error when releasing local port %d with protocol %s: %w", data.NodePort, protocol, err) + } + // We don't need to delete cache from different indexes repeatedly because they map to the same entry. + // Deletion errors are not possible because our Index functions cannot return errors. + // See https://github.com/kubernetes/client-go/blob/3aa45779f2e5592d52edf68da66abfbd0805e413/tools/cache/store.go#L189-L196 + pt.deletePortTableCache(data) + return nil +} + func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() @@ -141,15 +133,7 @@ func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) erro // Delete not required when the PortTable entry does not exist return nil } - if err := pt.PodPortRules.DeleteRule(data.NodePort, podIP, podPort, protocol); err != nil { - return err - } - if err := data.CloseSockets(); err != nil { - return err - } - // We don't need to delete cache from different indexes repeatedly because they map to the same entry. - pt.deletePortTableCache(data) - return nil + return pt.deleteRule(data) } func (pt *PortTable) DeleteRulesForPod(podIP string) error { @@ -157,14 +141,7 @@ func (pt *PortTable) DeleteRulesForPod(podIP string) error { defer pt.tableLock.Unlock() podEntries := pt.getDataForPodIP(podIP) for _, podEntry := range podEntries { - protocolSocketData := podEntry.Protocol - if err := pt.PodPortRules.DeleteRule(podEntry.NodePort, podIP, podEntry.PodPort, protocolSocketData.Protocol); err != nil { - return err - } - if err := protocolSocketData.socket.Close(); err != nil { - return fmt.Errorf("error when releasing local port %d with protocol %s: %v", podEntry.NodePort, protocolSocketData.Protocol, err) - } - pt.deletePortTableCache(podEntry) + return pt.deleteRule(podEntry) } return nil } @@ -177,16 +154,11 @@ func (pt *PortTable) syncRules() error { nplPorts := make([]rules.PodNodePort, 0, len(objs)) for _, obj := range objs { npData := obj.(*NodePortData) - protocols := make([]string, 0, 1) - protocol := npData.Protocol - if protocol.State == stateInUse { - protocols = append(protocols, protocol.Protocol) - } nplPorts = append(nplPorts, rules.PodNodePort{ NodePort: npData.NodePort, PodPort: npData.PodPort, PodIP: npData.PodIP, - Protocol: protocols[0], + Protocol: npData.Protocol.Protocol, }) } if err := pt.PodPortRules.AddAllRules(nplPorts); err != nil { diff --git a/pkg/agent/nodeportlocal/portcache/port_table_others_test.go b/pkg/agent/nodeportlocal/portcache/port_table_others_test.go index 2ac40b34e95..c5e1e4245b5 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_others_test.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_others_test.go @@ -18,9 +18,12 @@ package portcache import ( + "fmt" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" portcachetesting "antrea.io/antrea/pkg/agent/nodeportlocal/portcache/testing" @@ -73,3 +76,56 @@ func TestRestoreRules(t *testing.T) { t.Fatalf("Rule restoration not complete after %v", timeout) } } + +type mockCloser struct { + closeErr error +} + +func (m *mockCloser) Close() error { + return m.closeErr +} + +func TestDeleteRule(t *testing.T) { + mockCtrl := gomock.NewController(t) + mockIPTables := rulestesting.NewMockPodPortRules(mockCtrl) + mockPortOpener := portcachetesting.NewMockLocalPortOpener(mockCtrl) + portTable := newPortTable(mockIPTables, mockPortOpener) + + const ( + podPort = 1001 + protocol = "tcp" + ) + + closer := &mockCloser{} + + data := &NodePortData{ + NodePort: nodePort1, + PodPort: podPort, + PodIP: podIP, + Protocol: ProtocolSocketData{ + Protocol: protocol, + socket: closer, + }, + } + + require.NoError(t, portTable.addPortTableCache(data)) + assert.False(t, data.Defunct()) + + mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol).Return(fmt.Errorf("iptables error")) + require.ErrorContains(t, portTable.DeleteRule(podIP, podPort, protocol), "iptables error") + + mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol) + closer.closeErr = fmt.Errorf("close error") + require.ErrorContains(t, portTable.DeleteRule(podIP, podPort, protocol), "close error") + assert.True(t, data.Defunct()) + + closer.closeErr = nil + + // First successful call to DeleteRule. + mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol) + assert.NoError(t, portTable.DeleteRule(podIP, podPort, protocol)) + + // Calling DeleteRule again will return immediately as the NodePortData entry has been + // removed from the cache. + assert.NoError(t, portTable.DeleteRule(podIP, podPort, protocol)) +} diff --git a/pkg/agent/nodeportlocal/portcache/port_table_windows.go b/pkg/agent/nodeportlocal/portcache/port_table_windows.go index aff2a32247e..e084bde1838 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_windows.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_windows.go @@ -25,11 +25,6 @@ import ( "antrea.io/antrea/pkg/agent/nodeportlocal/rules" ) -const ( - // stateInUse means that the NPL rule has been installed. - stateInUse protocolSocketState = 1 -) - func addRuleForPort(podPortRules rules.PodPortRules, port int, podIP string, podPort int, protocol string) (ProtocolSocketData, error) { // Only the protocol used here should be returned if NetNatStaticMapping rule // can be inserted to an unused protocol port. @@ -40,7 +35,6 @@ func addRuleForPort(podPortRules rules.PodPortRules, port int, podIP string, pod } protocolData := ProtocolSocketData{ Protocol: protocol, - State: stateInUse, socket: nil, } return protocolData, nil @@ -137,6 +131,8 @@ func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) erro return nil } + data.defunct = true + // Calling DeleteRule is idempotent. if err := pt.PodPortRules.DeleteRule(data.NodePort, podIP, podPort, protocol); err != nil { return err } diff --git a/test/e2e/nodeportlocal_test.go b/test/e2e/nodeportlocal_test.go index 6aa801c38c9..21bb85d5a16 100644 --- a/test/e2e/nodeportlocal_test.go +++ b/test/e2e/nodeportlocal_test.go @@ -95,7 +95,7 @@ func TestNodePortLocal(t *testing.T) { t.Run("testNPLChangePortRangeAgentRestart", func(t *testing.T) { testNPLChangePortRangeAgentRestart(t, data) }) } -func getNPLAnnotations(t *testing.T, data *TestData, r *require.Assertions, testPodName string, conditionFn func(types.NPLAnnotation) bool) ([]types.NPLAnnotation, string) { +func getNPLAnnotations(t *testing.T, data *TestData, r *require.Assertions, testPodName string, conditionFn func([]types.NPLAnnotation) bool) ([]types.NPLAnnotation, string) { var nplAnnotations []types.NPLAnnotation var testPodIP *PodIPs @@ -135,12 +135,8 @@ func getNPLAnnotations(t *testing.T, data *TestData, r *require.Assertions, test return false, nil } json.Unmarshal([]byte(nplAnn), &nplAnnotations) - if conditionFn != nil { - for _, annotation := range nplAnnotations { - if !conditionFn(annotation) { - return false, nil - } - } + if conditionFn != nil && !conditionFn(nplAnnotations) { + return false, nil } return found, nil }) @@ -495,13 +491,11 @@ func NPLTestPodAddMultiProtocol(t *testing.T, data *TestData) { Add(nil, 8080, "tcp").Add(nil, 8080, "udp") // Creating a Pod using agnhost image to support multiple protocols, instead of nginx. - cmd := []string{"/bin/bash", "-c"} - args := []string{ - fmt.Sprintf("/agnhost serve-hostname --udp --http=false --port %d & /agnhost serve-hostname --tcp --http=false --port %d", 8080, 8080), - } + + args := []string{"serve-hostname", "--tcp", "--udp", "--http=false", "--port=8080"} port := corev1.ContainerPort{ContainerPort: 8080} containerName := fmt.Sprintf("c%v", 8080) - err := NewPodBuilder(testPodName, data.testNamespace, agnhostImage).OnNode(serverNode).WithContainerName(containerName).WithCommand(cmd).WithArgs(args).WithPorts([]corev1.ContainerPort{port}).WithLabels(selector).Create(testData) + err := NewPodBuilder(testPodName, data.testNamespace, agnhostImage).OnNode(serverNode).WithContainerName(containerName).WithArgs(args).WithPorts([]corev1.ContainerPort{port}).WithLabels(selector).Create(testData) r.NoError(err, "Error creating test Pod: %v", err) nplAnnotations, testPodIP := getNPLAnnotations(t, testData, r, testPodName, nil) @@ -517,14 +511,26 @@ func NPLTestPodAddMultiProtocol(t *testing.T, data *TestData) { r.NoError(err, "Error when getting Antrea Agent Pod on Node '%s'", serverNode) checkNPLRules(t, testData, r, nplAnnotations, antreaPod, testPodIP, serverNode, true) + expectedAnnotations.Check(t, nplAnnotations) + checkTrafficForNPL(testData, r, nplAnnotations, clientName) + + // We now delete one of the Services, and we expect the corresponding NPL rule to be deleted. + testData.DeleteService(data.testNamespace, "agnhost2") + expectedAnnotations = newExpectedNPLAnnotations(defaultStartPort, defaultEndPort). + Add(nil, 8080, "tcp") + // Wait until we have only one NPL rule annotation. + conditionFn := func(annotations []types.NPLAnnotation) bool { + return len(annotations) == 1 + } + nplAnnotations, testPodIP = getNPLAnnotations(t, testData, r, testPodName, conditionFn) + checkNPLRules(t, testData, r, nplAnnotations, antreaPod, testPodIP, serverNode, true) expectedAnnotations.Check(t, nplAnnotations) checkTrafficForNPL(testData, r, nplAnnotations, clientName) testData.DeletePod(data.testNamespace, clientName) testData.DeletePod(data.testNamespace, testPodName) testData.DeleteService(data.testNamespace, "agnhost1") - testData.DeleteService(data.testNamespace, "agnhost2") checkNPLRules(t, testData, r, nplAnnotations, antreaPod, testPodIP, serverNode, false) } @@ -697,8 +703,11 @@ func testNPLChangePortRangeAgentRestart(t *testing.T, data *TestData) { } for _, testPodName := range testPods { - conditionFn := func(ann types.NPLAnnotation) bool { - return ann.NodePort >= updatedStartPort + conditionFn := func(annotations []types.NPLAnnotation) bool { + for idx := range annotations { + return annotations[idx].NodePort >= updatedStartPort + } + return true } nplAnnotations, testPodIP := getNPLAnnotations(t, data, r, testPodName, conditionFn)