Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #6284: Fix single rule deletion for NodePortLocal on Linux (#6284) #6299

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,13 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
}
podPorts[targetPortProto] = struct{}{}
portData := c.portTable.GetEntry(podIP, port, protocol)
if portData != nil && !portData.ProtocolInUse(protocol) {
// If the PortTable has an entry for the Pod but does not have an
// entry with protocol, we enforce AddRule for the missing Protocol.
// Special handling for a rule that was previously marked for deletion but could not
// be deleted properly: we have to retry now.
if portData != nil && portData.Defunct() {
klog.InfoS("Deleting defunct rule for Pod to prevent re-use", "pod", klog.KObj(pod), "podIP", podIP, "port", port, "protocol", protocol)
if err := c.portTable.DeleteRule(podIP, port, protocol); err != nil {
return fmt.Errorf("failed to delete defunct rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, port, protocol, err)
}
portData = nil
}
if portData == nil {
Expand Down Expand Up @@ -527,13 +531,11 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
// second, delete any existing rule that is not needed based on the current Pod
// specification.
entries := c.portTable.GetDataForPodIP(podIP)
if nplExists {
for _, data := range entries {
proto := data.Protocol
if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists {
if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil {
return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %v", podIP, data.PodPort, proto.Protocol, err)
}
for _, data := range entries {
proto := data.Protocol
if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists {
if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil {
return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, data.PodPort, proto.Protocol, err)
}
}
}
Expand Down
183 changes: 153 additions & 30 deletions pkg/agent/nodeportlocal/npl_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,12 @@ func getTestSvcWithPortName(portName string) *corev1.Service {

type testData struct {
*testing.T
stopCh chan struct{}
ctrl *gomock.Controller
k8sClient *k8sfake.Clientset
portTable *portcache.PortTable
wg sync.WaitGroup
stopCh chan struct{}
ctrl *gomock.Controller
k8sClient *k8sfake.Clientset
portTable *portcache.PortTable
svcInformer cache.SharedIndexInformer
wg sync.WaitGroup
}

func (t *testData) runWrapper(c *k8s.NPLController) {
Expand Down Expand Up @@ -234,30 +235,35 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData {
mockPortOpener.EXPECT().OpenLocalPort(gomock.Any(), gomock.Any()).AnyTimes().Return(&fakeSocket{}, nil)
}

data := &testData{
T: t,
stopCh: make(chan struct{}),
ctrl: mockCtrl,
k8sClient: k8sfake.NewSimpleClientset(objects...),
portTable: newPortTable(mockIPTables, mockPortOpener),
}
k8sClient := k8sfake.NewSimpleClientset(objects...)

portTable := newPortTable(mockIPTables, mockPortOpener)

resyncPeriod := 0 * time.Minute
// informerFactory is initialized and started from cmd/antrea-agent/agent.go
informerFactory := informers.NewSharedInformerFactory(data.k8sClient, resyncPeriod)
informerFactory := informers.NewSharedInformerFactory(k8sClient, resyncPeriod)
listOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", defaultNodeName).String()
}
localPodInformer := coreinformers.NewFilteredPodInformer(
data.k8sClient,
k8sClient,
metav1.NamespaceAll,
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, // NamespaceIndex is used in NPLController.
listOptions,
)
svcInformer := informerFactory.Core().V1().Services().Informer()

c := k8s.NewNPLController(data.k8sClient, localPodInformer, svcInformer, data.portTable, defaultNodeName)
c := k8s.NewNPLController(k8sClient, localPodInformer, svcInformer, portTable, defaultNodeName)

data := &testData{
T: t,
stopCh: make(chan struct{}),
ctrl: mockCtrl,
k8sClient: k8sClient,
portTable: portTable,
svcInformer: svcInformer,
}

data.runWrapper(c)
informerFactory.Start(data.stopCh)
Expand Down Expand Up @@ -305,31 +311,41 @@ func (t *testData) tearDown() {
os.Unsetenv("NODE_NAME")
}

func (t *testData) pollForPodAnnotation(podName string, found bool) ([]types.NPLAnnotation, error) {
var data string
var exists bool
func conditionMatchAll([]types.NPLAnnotation) bool {
return true
}

// If conditionFn is nil, we will assume you are looking for a non-existing annotation.
// If you want to match all, use conditionMatchAll as the conditionFn.
func (t *testData) pollForPodAnnotationWithCondition(podName string, conditionFn func([]types.NPLAnnotation) bool) ([]types.NPLAnnotation, error) {
var nplValue []types.NPLAnnotation
// do not use PollImmediate: 1 second is reserved for the controller to do his job and
// update Pod NPL annotations as needed.
err := wait.Poll(time.Second, 20*time.Second, func() (bool, error) {
updatedPod, err := t.k8sClient.CoreV1().Pods(defaultNS).Get(context.TODO(), podName, metav1.GetOptions{})
require.NoError(t, err, "Failed to get Pod")
annotation := updatedPod.GetAnnotations()
data, exists = annotation[types.NPLAnnotationKey]
if found {
return exists, nil
data, exists := annotation[types.NPLAnnotationKey]
if !exists {
return conditionFn == nil, nil
}
return !exists, nil
if conditionFn == nil {
return false, nil
}
if err := json.Unmarshal([]byte(data), &nplValue); err != nil {
return false, err
}
return conditionFn(nplValue), nil
})
return nplValue, err
}

if err != nil {
return []types.NPLAnnotation{}, err
}
if data == "" {
return []types.NPLAnnotation{}, nil
func (t *testData) pollForPodAnnotation(podName string, found bool) ([]types.NPLAnnotation, error) {
var conditionFn func([]types.NPLAnnotation) bool
if found {
conditionFn = conditionMatchAll
}
var nplValue []types.NPLAnnotation
err = json.Unmarshal([]byte(data), &nplValue)
return nplValue, err
return t.pollForPodAnnotationWithCondition(podName, conditionFn)
}

func (t *testData) updateServiceOrFail(testSvc *corev1.Service) {
Expand Down Expand Up @@ -497,6 +513,7 @@ func TestPodDelete(t *testing.T) {

// TestPodAddMultiPort creates a Pod and a Service with two target ports.
// It verifies that the Pod's NPL annotation and the local port table are updated with both ports.
// It then updates the Service to remove one of the target ports.
func TestAddMultiPortPodSvc(t *testing.T) {
newPort := 90
testSvc := getTestSvc(defaultPort, int32(newPort))
Expand All @@ -510,6 +527,16 @@ func TestAddMultiPortPodSvc(t *testing.T) {
expectedAnnotations.Check(t, value)
assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP))
assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP))

// Remove the second target port.
testSvc.Spec.Ports = testSvc.Spec.Ports[:1]
testData.updateServiceOrFail(testSvc)
// Wait for annotation to be updated (single mapping).
value, err = testData.pollForPodAnnotationWithCondition(testPod.Name, func(value []types.NPLAnnotation) bool { return len(value) == 1 })
require.NoError(t, err, "Poll for annotation check failed")
expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP)
expectedAnnotations.Check(t, value)
assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP))
}

// TestPodAddMultiPort creates a Pod with multiple ports and a Service with only one target port.
Expand Down Expand Up @@ -807,3 +834,99 @@ func TestSyncRulesError(t *testing.T) {
testData, _, _ := setUpWithTestServiceAndPod(t, testConfig, nil)
defer testData.tearDown()
}

func TestSingleRuleDeletionError(t *testing.T) {
newPort := 90
testSvc := getTestSvc(defaultPort, int32(newPort))
testPod := getTestPod()

testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) {
mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes()
gomock.InOrder(
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2),
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("iptables failure")),
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()),
)
})

testData := setUp(t, testConfig, testSvc, testPod)
defer testData.tearDown()

value, err := testData.pollForPodAnnotation(testPod.Name, true)
require.NoError(t, err, "Poll for annotation check failed")
expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP)
expectedAnnotations.Check(t, value)
assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP))
assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP))

// Remove the second target port, to force one mapping to be deleted.
testSvc.Spec.Ports = testSvc.Spec.Ports[:1]
testData.updateServiceOrFail(testSvc)
// The first deletion attempt will fail, but the second should succeed.
// Wait for annotation to be updated (single mapping).
value, err = testData.pollForPodAnnotationWithCondition(testPod.Name, func(value []types.NPLAnnotation) bool { return len(value) == 1 })
require.NoError(t, err, "Poll for annotation check failed")
expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP)
expectedAnnotations.Check(t, value)
assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP))
}

func TestPreventDefunctRuleReuse(t *testing.T) {
newPort := 90
testSvc := getTestSvc(defaultPort, int32(newPort))
testPod := getTestPod()

var testData *testData

ports := testSvc.Spec.Ports
// This function will be executed synchronously when DeleteRule is called for the first time
// and we simulate a failure. It restores the second target port for the Service, which was
// deleted previously, and waits for the change to be reflected in the informer's
// store. After that, we know that the next time the NPL controller processes the test Pod,
// it will need to ensure that both NPL mappings are configured correctly. Because one of
// the rules will be marked as "defunct", it will first need to delete the rule properly
// before adding it back.
restoreServiceTargetPorts := func() {
testSvc.Spec.Ports = ports
_, err := testData.k8sClient.CoreV1().Services(defaultNS).Update(context.TODO(), testSvc, metav1.UpdateOptions{})
if !assert.NoError(t, err) {
return
}
assert.EventuallyWithT(t, func(c *assert.CollectT) {
obj, exists, err := testData.svcInformer.GetIndexer().GetByKey(testSvc.Namespace + "/" + testSvc.Name)
if !assert.NoError(t, err) || !assert.True(t, exists) {
return
}
svc := obj.(*corev1.Service)
assert.Len(t, svc.Spec.Ports, 2)
}, 2*time.Second, 50*time.Millisecond)
}

testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) {
mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes()
gomock.InOrder(
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2),
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(nodePort int, podIP string, podPort int, protocol string) { restoreServiceTargetPorts() },
).Return(fmt.Errorf("iptables failure")),
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()),
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()),
)
})

testData = setUp(t, testConfig, testSvc, testPod)
defer testData.tearDown()

value, err := testData.pollForPodAnnotation(testPod.Name, true)
require.NoError(t, err, "Poll for annotation check failed")
expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP)
expectedAnnotations.Check(t, value)
assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP))
assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP))

// Remove the second target port, to force one mapping to be deleted.
testSvc.Spec.Ports = testSvc.Spec.Ports[:1]
testData.updateServiceOrFail(testSvc)

assert.Eventually(t, testData.ctrl.Satisfied, 2*time.Second, 50*time.Millisecond)
}
26 changes: 9 additions & 17 deletions pkg/agent/nodeportlocal/portcache/port_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,8 @@ const (
PodIPIndex = "podIPIndex"
)

// protocolSocketState represents the state of the socket corresponding to a
// given (Node port, protocol) tuple.
type protocolSocketState int

type ProtocolSocketData struct {
Protocol string
State protocolSocketState
socket io.Closer
}

Expand All @@ -47,14 +42,13 @@ type NodePortData struct {
PodPort int
PodIP string
Protocol ProtocolSocketData
// defunct is used to indicate that a rule has been partially deleted: it is no longer
// usable and deletion needs to be re-attempted.
defunct bool
}

func (d *NodePortData) ProtocolInUse(protocol string) bool {
protocolSocketData := d.Protocol
if protocolSocketData.Protocol == protocol {
return protocolSocketData.State == stateInUse
}
return false
func (d *NodePortData) Defunct() bool {
return d.defunct
}

type LocalPortOpener interface {
Expand Down Expand Up @@ -204,8 +198,8 @@ func podIPPortProtoFormat(ip string, port int, protocol string) string {
}

func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol string) *NodePortData {
data, exists := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol))
if exists == false {
data, ok := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol))
if !ok {
return nil
}
return data
Expand All @@ -214,10 +208,8 @@ func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol stri
func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool {
pt.tableLock.RLock()
defer pt.tableLock.RUnlock()
if data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol); data != nil {
return data.ProtocolInUse(protocol)
}
return false
data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol)
return data != nil
}

// nodePortProtoFormat formats the nodeport, protocol to string port:protocol.
Expand Down
Loading
Loading