Skip to content

Commit

Permalink
Handle iptables-restore failures correctly in NPL Controller
Browse files Browse the repository at this point in the history
Add a retry mechanism in the Controller initialization, which will keep
trying to sync iptables rules until the operation is successful. On
success, the NPL Controller is notified through a channel and can start
its event handlers.

Fixes #2554

Signed-off-by: Antonin Bas <abas@vmware.com>
  • Loading branch information
antoninbas committed Aug 6, 2021
1 parent bde2877 commit c08cb85
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 69 deletions.
53 changes: 30 additions & 23 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,16 @@ const (
)

type NPLController struct {
portTable *portcache.PortTable
kubeClient clientset.Interface
queue workqueue.RateLimitingInterface
podInformer cache.SharedIndexInformer
podLister corelisters.PodLister
svcInformer cache.SharedIndexInformer
podToIP map[string]string
nodeName string
podIPLock sync.RWMutex
portTable *portcache.PortTable
kubeClient clientset.Interface
queue workqueue.RateLimitingInterface
podInformer cache.SharedIndexInformer
podLister corelisters.PodLister
svcInformer cache.SharedIndexInformer
podToIP map[string]string
nodeName string
podIPLock sync.RWMutex
rulesInitialized chan struct{}
}

func NewNPLController(kubeClient clientset.Interface,
Expand All @@ -68,13 +69,14 @@ func NewNPLController(kubeClient clientset.Interface,
pt *portcache.PortTable,
nodeName string) *NPLController {
c := NPLController{
kubeClient: kubeClient,
portTable: pt,
podInformer: podInformer,
podLister: corelisters.NewPodLister(podInformer.GetIndexer()),
svcInformer: svcInformer,
podToIP: make(map[string]string),
nodeName: nodeName,
kubeClient: kubeClient,
portTable: pt,
podInformer: podInformer,
podLister: corelisters.NewPodLister(podInformer.GetIndexer()),
svcInformer: svcInformer,
podToIP: make(map[string]string),
nodeName: nodeName,
rulesInitialized: make(chan struct{}),
}

podInformer.AddEventHandlerWithResyncPeriod(
Expand Down Expand Up @@ -117,6 +119,14 @@ func podKeyFunc(pod *corev1.Pod) string {
return pod.Namespace + "/" + pod.Name
}

func (c *NPLController) Initialize() error {
klog.InfoS("Will fetch Pods and generate NodePortLocal rules for these Pods")
if err := c.GetPodsAndGenRules(); err != nil {
return fmt.Errorf("error when getting Pods and generating rules: %v", err)
}
return nil
}

// Run starts to watch and process Pod updates for the Node where Antrea Agent is running.
// It starts a queue and a fixed number of workers to process the objects from the queue.
func (c *NPLController) Run(stopCh <-chan struct{}) {
Expand All @@ -130,12 +140,9 @@ func (c *NPLController) Run(stopCh <-chan struct{}) {
if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.podInformer.HasSynced, c.svcInformer.HasSynced) {
return
}
klog.Info("Will fetch Pods and generate NodePortLocal rules for these Pods")

if err := c.GetPodsAndGenRules(); err != nil {
klog.Errorf("Error in getting Pods and generating rules: %v", err)
return
}
klog.InfoS("Waiting for initialization of NodePortLocal rules to complete")
<-c.rulesInitialized
klog.InfoS("Initialization of NodePortLocal rules successful")

for i := 0; i < numWorkers; i++ {
go wait.Until(c.Worker, time.Second, stopCh)
Expand Down Expand Up @@ -610,7 +617,7 @@ func (c *NPLController) GetPodsAndGenRules() error {
}

func (c *NPLController) addRulesForNPLPorts(allNPLPorts []rules.PodNodePort) error {
return c.portTable.SyncRules(allNPLPorts)
return c.portTable.RestoreRules(allNPLPorts, c.rulesInitialized)
}

// cleanupNPLAnnotationForPod removes the NodePortLocal annotation from the Pod's annotations map entirely.
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/nodeportlocal/npl_agent_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,9 @@ func InitController(kubeClient clientset.Interface, informerFactory informers.Sh
portTable,
nodeName)

if err := c.Initialize(); err != nil {
return nil, fmt.Errorf("error when initializing NodePortLocal Controller: %v", err)
}

return c, nil
}
100 changes: 58 additions & 42 deletions pkg/agent/nodeportlocal/npl_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,11 @@ func getTestSvcWithPortName(portName string) *corev1.Service {

type testData struct {
*testing.T
stopCh chan struct{}
ctrl *gomock.Controller
k8sClient *k8sfake.Clientset
portTable *portcache.PortTable
mockPortOpener *portcachetesting.MockLocalPortOpener
wg sync.WaitGroup
stopCh chan struct{}
ctrl *gomock.Controller
k8sClient *k8sfake.Clientset
portTable *portcache.PortTable
wg sync.WaitGroup
}

func (data *testData) runWrapper(c *nplk8s.NPLController) {
Expand All @@ -170,18 +169,25 @@ func (data *testData) runWrapper(c *nplk8s.NPLController) {
}()
}

type customizePortOpenerExpectations func(*portcachetesting.MockLocalPortOpener)
type customizePodPortRulesExpectations func(*rulestesting.MockPodPortRules)

type testConfig struct {
defaultPortOpenerExpectations bool
customPortOpenerExpectations customizePortOpenerExpectations
customPodPortRulesExpectations customizePodPortRulesExpectations
}

func newTestConfig() *testConfig {
return &testConfig{
defaultPortOpenerExpectations: true,
}
return &testConfig{}
}

func (tc *testConfig) withCustomPortOpenerExpectations(fn customizePortOpenerExpectations) *testConfig {
tc.customPortOpenerExpectations = fn
return tc
}

func (tc *testConfig) withDefaultPortOpenerExpectations(v bool) *testConfig {
tc.defaultPortOpenerExpectations = false
func (tc *testConfig) withCustomPodPortRulesExpectations(fn customizePodPortRulesExpectations) *testConfig {
tc.customPodPortRulesExpectations = fn
return tc
}

Expand All @@ -191,22 +197,27 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData {
mockCtrl := gomock.NewController(t)

mockIPTables := rulestesting.NewMockPodPortRules(mockCtrl)
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes()
if tc.customPodPortRulesExpectations != nil {
tc.customPodPortRulesExpectations(mockIPTables)
} else {
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes()
}

mockPortOpener := portcachetesting.NewMockLocalPortOpener(mockCtrl)
if tc.defaultPortOpenerExpectations {
if tc.customPortOpenerExpectations != nil {
tc.customPortOpenerExpectations(mockPortOpener)
} else {
mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).AnyTimes().Return(&fakeSocket{}, nil)
}

data := &testData{
T: t,
stopCh: make(chan struct{}),
ctrl: mockCtrl,
k8sClient: k8sfake.NewSimpleClientset(objects...),
portTable: newPortTable(mockIPTables, mockPortOpener),
mockPortOpener: mockPortOpener,
T: t,
stopCh: make(chan struct{}),
ctrl: mockCtrl,
k8sClient: k8sfake.NewSimpleClientset(objects...),
portTable: newPortTable(mockIPTables, mockPortOpener),
}

// informerFactory is initialized and started from cmd/antrea-agent/agent.go
Expand Down Expand Up @@ -659,30 +670,35 @@ var (
// TestNodePortAlreadyBoundTo validates that when a port is already bound to, a different port will
// be selected for NPL.
func TestNodePortAlreadyBoundTo(t *testing.T) {
testSvc := getTestSvc()
testPod := getTestPod()
testConfig := newTestConfig().withDefaultPortOpenerExpectations(false)
testData := setUp(t, testConfig)
defer testData.tearDown()

var nodePort int
gomock.InOrder(
testData.mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).Return(nil, portTakenError),
testData.mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).DoAndReturn(func(port int) (portcache.Closeable, error) {
nodePort = port
return &fakeSocket{}, nil
}),
)

_, err := testData.k8sClient.CoreV1().Services(defaultNS).Create(context.TODO(), testSvc, metav1.CreateOptions{})
require.NoError(t, err, "Service creation failed")

_, err = testData.k8sClient.CoreV1().Pods(defaultNS).Create(context.TODO(), testPod, metav1.CreateOptions{})
require.NoError(t, err, "Pod creation failed")
testConfig := newTestConfig().withCustomPortOpenerExpectations(func(mockPortOpener *portcachetesting.MockLocalPortOpener) {
gomock.InOrder(
mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).Return(nil, portTakenError),
mockPortOpener.EXPECT().OpenLocalPort(gomock.Any()).DoAndReturn(func(port int) (portcache.Closeable, error) {
nodePort = port
return &fakeSocket{}, nil
}),
)
})
testData, _, testPod := setUpWithTestServiceAndPod(t, testConfig)
defer testData.tearDown()

value, err := testData.pollForPodAnnotation(testPod.Name, true)
require.NoError(t, err, "Poll for annotation check failed")
annotation := testData.checkAnnotationValue(value, defaultPort)[0] // length of slice is guaranteed to be correct at this stage
assert.Equal(t, nodePort, annotation.NodePort)
assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort))
}

func TestSyncRulesError(t *testing.T) {
testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) {
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
gomock.InOrder(
mockIPTables.EXPECT().AddAllRules(gomock.Any()).Return(fmt.Errorf("iptables failure")),
mockIPTables.EXPECT().AddAllRules(gomock.Any()).Return(nil).AnyTimes(),
)
})

testData, _, _ := setUpWithTestServiceAndPod(t, testConfig)
defer testData.tearDown()
}
40 changes: 36 additions & 4 deletions pkg/agent/nodeportlocal/portcache/port_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"sync"
"time"

"k8s.io/klog/v2"

Expand Down Expand Up @@ -156,8 +157,26 @@ func (pt *PortTable) RuleExists(podIP string, podPort int) bool {
return false
}

func (pt *PortTable) SyncRules(allNPLPorts []rules.PodNodePort) error {
validNPLPorts := make([]rules.PodNodePort, 0, len(allNPLPorts))
// syncRules ensures that contents of the port table matches the iptables rules present on the Node.
func (pt *PortTable) syncRules() error {
pt.tableLock.Lock()
defer pt.tableLock.Unlock()
nplPorts := make([]rules.PodNodePort, 0, len(pt.Table))
for _, data := range pt.Table {
nplPorts = append(nplPorts, rules.PodNodePort{
NodePort: data.NodePort,
PodPort: data.PodPort,
PodIP: data.PodIP,
})
}
return pt.PodPortRules.AddAllRules(nplPorts)
}

// RestoreRules should be called on startup to restore a set of NPL rules. It is non-blocking but
// takes as a parameter a channel, synced, which will be closed when the necessary rules have been
// restored successfully. No other operations should be performed on the PortTable until the channel
// is closed.
func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- struct{}) error {
pt.tableLock.Lock()
defer pt.tableLock.Unlock()
for _, nplPort := range allNPLPorts {
Expand All @@ -176,9 +195,22 @@ func (pt *PortTable) SyncRules(allNPLPorts []rules.PodNodePort) error {
socket: socket,
}
pt.Table[nplPort.NodePort] = data
validNPLPorts = append(validNPLPorts, nplPort)
}
return pt.PodPortRules.AddAllRules(validNPLPorts)
// retry mechanism as iptables-restore can fail if other components (in Antrea or other
// software) or accessing iptables.
go func() {
defer close(synced)
var backoffTime = 2 * time.Second
for {
if err := pt.syncRules(); err != nil {
klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
break
}
}()
return nil
}

// openLocalPort binds to the provided port.
Expand Down

0 comments on commit c08cb85

Please sign in to comment.