Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #2555: Handle iptables-restore failures correctly in NPL #2575

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,8 @@ func (c *NPLController) Run(stopCh <-chan struct{}) {
if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.podInformer.HasSynced, c.svcInformer.HasSynced) {
return
}
klog.Info("Will fetch Pods and generate NodePortLocal rules for these Pods")

if err := c.GetPodsAndGenRules(); err != nil {
klog.Errorf("Error in getting Pods and generating rules: %v", err)
return
}
c.waitForRulesInitialization()

for i := 0; i < numWorkers; i++ {
go wait.Until(c.Worker, time.Second, stopCh)
Expand Down Expand Up @@ -551,18 +547,27 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
return nil
}

// GetPodsAndGenRules fetches all the Pods on this Node and looks for valid NodePortLocal
// waitForRulesInitialization fetches all the Pods on this Node and looks for valid NodePortLocal
// annotations. If they exist, with a valid Node port, it adds the Node port to the port table and
// rules. If the NodePortLocal annotation is invalid (cannot be unmarshalled), the annotation is
// cleared. If the Node port is invalid (maybe the port range was changed and the Agent was
// restarted), the annotation is ignored and will be removed by the Pod event handlers. The Pod
// event handlers will also take care of allocating a new Node port if required.
func (c *NPLController) GetPodsAndGenRules() error {
// The function is meant to be called during Controller initialization, after the caches have
// synced. It will block until iptables rules have been synced successfully based on the listed
// Pods. After it returns, the Controller should start handling events. In case of an unexpected
// error, the function can return early or may not complete initialization. The Controller's event
// handlers are able to recover from these errors.
func (c *NPLController) waitForRulesInitialization() {
klog.InfoS("Will fetch Pods and generate NodePortLocal rules for these Pods")

podList, err := c.podLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("error in fetching the Pods for Node %s: %v", c.nodeName, err)
klog.ErrorS(err, "Error when listing Pods for Node")
}

// in case of an error when listing Pods above, allNPLPorts will be
// empty and all NPL iptables rules will be deleted.
allNPLPorts := []rules.PodNodePort{}
for i := range podList {
// For each Pod:
Expand All @@ -576,12 +581,11 @@ func (c *NPLController) GetPodsAndGenRules() error {
continue
}
nplData := []NPLAnnotation{}
err := json.Unmarshal([]byte(nplAnnotation), &nplData)
if err != nil {
if err := json.Unmarshal([]byte(nplAnnotation), &nplData); err != nil {
klog.InfoS("Found invalid NodePortLocal annotation for Pod that cannot be parsed, cleaning it up", "pod", klog.KObj(pod))
// if there's an error in this NodePortLocal annotation, clean it up
err := c.cleanupNPLAnnotationForPod(pod)
if err != nil {
return err
if err := c.cleanupNPLAnnotationForPod(pod); err != nil {
klog.ErrorS(err, "Error when cleaning up NodePortLocal annotation for Pod", "pod", klog.KObj(pod))
}
continue
}
Expand All @@ -590,27 +594,30 @@ func (c *NPLController) GetPodsAndGenRules() error {
if npl.NodePort > c.portTable.EndPort || npl.NodePort < c.portTable.StartPort {
// ignoring annotation for now, it will be removed by the first call
// to handleAddUpdatePod
klog.V(2).Infof("Found invalid NodePortLocal annotation for Pod %s/%s: %s, ignoring it", pod.Namespace, pod.Name, nplAnnotation)
klog.V(2).InfoS("Found NodePortLocal annotation for which the allocated port doesn't fall into the configured range", "pod", klog.KObj(pod))
continue
} else {
allNPLPorts = append(allNPLPorts, rules.PodNodePort{
NodePort: npl.NodePort,
PodPort: npl.PodPort,
PodIP: pod.Status.PodIP,
})
}
allNPLPorts = append(allNPLPorts, rules.PodNodePort{
NodePort: npl.NodePort,
PodPort: npl.PodPort,
PodIP: pod.Status.PodIP,
})
}
}

if err := c.addRulesForNPLPorts(allNPLPorts); err != nil {
return err
rulesInitialized := make(chan struct{})
if err := c.addRulesForNPLPorts(allNPLPorts, rulesInitialized); err != nil {
klog.ErrorS(err, "Cannot install NodePortLocal rules")
return
}

return nil
klog.InfoS("Waiting for initialization of NodePortLocal rules to complete")
<-rulesInitialized
klog.InfoS("Initialization of NodePortLocal rules successful")
}

func (c *NPLController) addRulesForNPLPorts(allNPLPorts []rules.PodNodePort) error {
return c.portTable.SyncRules(allNPLPorts)
func (c *NPLController) addRulesForNPLPorts(allNPLPorts []rules.PodNodePort, synced chan<- struct{}) error {
return c.portTable.RestoreRules(allNPLPorts, synced)
}

// cleanupNPLAnnotationForPod removes the NodePortLocal annotation from the Pod's annotations map entirely.
Expand Down
100 changes: 58 additions & 42 deletions pkg/agent/nodeportlocal/npl_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,11 @@ func getTestSvcWithPortName(portName string) *corev1.Service {

type testData struct {
*testing.T
stopCh chan struct{}
ctrl *gomock.Controller
k8sClient *k8sfake.Clientset
portTable *portcache.PortTable
mockPortOpener *portcachetesting.MockLocalPortOpener
wg sync.WaitGroup
stopCh chan struct{}
ctrl *gomock.Controller
k8sClient *k8sfake.Clientset
portTable *portcache.PortTable
wg sync.WaitGroup
}

func (data *testData) runWrapper(c *nplk8s.NPLController) {
Expand All @@ -170,18 +169,25 @@ func (data *testData) runWrapper(c *nplk8s.NPLController) {
}()
}

type customizePortOpenerExpectations func(*portcachetesting.MockLocalPortOpener)
type customizePodPortRulesExpectations func(*rulestesting.MockPodPortRules)

type testConfig struct {
defaultPortOpenerExpectations bool
customPortOpenerExpectations customizePortOpenerExpectations
customPodPortRulesExpectations customizePodPortRulesExpectations
}

func newTestConfig() *testConfig {
return &testConfig{
defaultPortOpenerExpectations: true,
}
return &testConfig{}
}

func (tc *testConfig) withCustomPortOpenerExpectations(fn customizePortOpenerExpectations) *testConfig {
tc.customPortOpenerExpectations = fn
return tc
}

func (tc *testConfig) withDefaultPortOpenerExpectations(v bool) *testConfig {
tc.defaultPortOpenerExpectations = false
func (tc *testConfig) withCustomPodPortRulesExpectations(fn customizePodPortRulesExpectations) *testConfig {
tc.customPodPortRulesExpectations = fn
return tc
}

Expand All @@ -191,22 +197,27 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData {
mockCtrl := gomock.NewController(t)

mockIPTables := rulestesting.NewMockPodPortRules(mockCtrl)
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes()
if tc.customPodPortRulesExpectations != nil {
tc.customPodPortRulesExpectations(mockIPTables)
} else {
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes()
}

mockPortOpener := portcachetesting.NewMockLocalPortOpener(mockCtrl)
if tc.defaultPortOpenerExpectations {
if tc.customPortOpenerExpectations != nil {
tc.customPortOpenerExpectations(mockPortOpener)
} else {
mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).AnyTimes().Return(&fakeSocket{}, nil)
}

data := &testData{
T: t,
stopCh: make(chan struct{}),
ctrl: mockCtrl,
k8sClient: k8sfake.NewSimpleClientset(objects...),
portTable: newPortTable(mockIPTables, mockPortOpener),
mockPortOpener: mockPortOpener,
T: t,
stopCh: make(chan struct{}),
ctrl: mockCtrl,
k8sClient: k8sfake.NewSimpleClientset(objects...),
portTable: newPortTable(mockIPTables, mockPortOpener),
}

// informerFactory is initialized and started from cmd/antrea-agent/agent.go
Expand Down Expand Up @@ -659,30 +670,35 @@ var (
// TestNodePortAlreadyBoundTo validates that when a port is already bound to, a different port will
// be selected for NPL.
func TestNodePortAlreadyBoundTo(t *testing.T) {
testSvc := getTestSvc()
testPod := getTestPod()
testConfig := newTestConfig().withDefaultPortOpenerExpectations(false)
testData := setUp(t, testConfig)
defer testData.tearDown()

var nodePort int
gomock.InOrder(
testData.mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).Return(nil, portTakenError),
testData.mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).DoAndReturn(func(port int) (portcache.Closeable, error) {
nodePort = port
return &fakeSocket{}, nil
}),
)

_, err := testData.k8sClient.CoreV1().Services(defaultNS).Create(context.TODO(), testSvc, metav1.CreateOptions{})
require.NoError(t, err, "Service creation failed")

_, err = testData.k8sClient.CoreV1().Pods(defaultNS).Create(context.TODO(), testPod, metav1.CreateOptions{})
require.NoError(t, err, "Pod creation failed")
testConfig := newTestConfig().withCustomPortOpenerExpectations(func(mockPortOpener *portcachetesting.MockLocalPortOpener) {
gomock.InOrder(
mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).Return(nil, portTakenError),
mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).DoAndReturn(func(port int) (portcache.Closeable, error) {
nodePort = port
return &fakeSocket{}, nil
}),
)
})
testData, _, testPod := setUpWithTestServiceAndPod(t, testConfig)
defer testData.tearDown()

value, err := testData.pollForPodAnnotation(testPod.Name, true)
require.NoError(t, err, "Poll for annotation check failed")
annotation := testData.checkAnnotationValue(value, defaultPort)[0] // length of slice is guaranteed to be correct at this stage
assert.Equal(t, nodePort, annotation.NodePort)
assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort))
}

func TestSyncRulesError(t *testing.T) {
testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) {
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
gomock.InOrder(
mockIPTables.EXPECT().AddAllRules(gomock.Any()).Return(fmt.Errorf("iptables failure")),
mockIPTables.EXPECT().AddAllRules(gomock.Any()).Return(nil).AnyTimes(),
)
})

testData, _, _ := setUpWithTestServiceAndPod(t, testConfig)
defer testData.tearDown()
}
40 changes: 36 additions & 4 deletions pkg/agent/nodeportlocal/portcache/port_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"sync"
"time"

"k8s.io/klog/v2"

Expand Down Expand Up @@ -156,8 +157,26 @@ func (pt *PortTable) RuleExists(podIP string, podPort int) bool {
return false
}

func (pt *PortTable) SyncRules(allNPLPorts []rules.PodNodePort) error {
validNPLPorts := make([]rules.PodNodePort, 0, len(allNPLPorts))
// syncRules ensures that contents of the port table matches the iptables rules present on the Node.
func (pt *PortTable) syncRules() error {
pt.tableLock.Lock()
defer pt.tableLock.Unlock()
nplPorts := make([]rules.PodNodePort, 0, len(pt.Table))
for _, data := range pt.Table {
nplPorts = append(nplPorts, rules.PodNodePort{
NodePort: data.NodePort,
PodPort: data.PodPort,
PodIP: data.PodIP,
})
}
return pt.PodPortRules.AddAllRules(nplPorts)
}

// RestoreRules should be called on startup to restore a set of NPL rules. It is non-blocking but
// takes as a parameter a channel, synced, which will be closed when the necessary rules have been
// restored successfully. No other operations should be performed on the PortTable until the channel
// is closed.
func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- struct{}) error {
pt.tableLock.Lock()
defer pt.tableLock.Unlock()
for _, nplPort := range allNPLPorts {
Expand All @@ -176,9 +195,22 @@ func (pt *PortTable) SyncRules(allNPLPorts []rules.PodNodePort) error {
socket: socket,
}
pt.Table[nplPort.NodePort] = data
validNPLPorts = append(validNPLPorts, nplPort)
}
return pt.PodPortRules.AddAllRules(validNPLPorts)
// retry mechanism as iptables-restore can fail if other components (in Antrea or other
// software) are accessing iptables.
go func() {
defer close(synced)
var backoffTime = 2 * time.Second
for {
if err := pt.syncRules(); err != nil {
klog.ErrorS(err, "Failed to restore iptables rules", "backoff", backoffTime)
time.Sleep(backoffTime)
continue
}
break
}
}()
return nil
}

// openLocalPort binds to the provided port.
Expand Down