From 8320b844ec55be79ff704c2ab862f2e6eb1103ef Mon Sep 17 00:00:00 2001 From: Pulkit Jain Date: Fri, 1 Dec 2023 18:14:52 +0530 Subject: [PATCH] Improve Egress API visibility Record event when EgressIP is assigned to the Node interface. Signed-off-by: Pulkit Jain --- .../antrea/templates/agent/clusterrole.yaml | 6 ++++ build/yamls/antrea-aks.yml | 6 ++++ build/yamls/antrea-eks.yml | 6 ++++ build/yamls/antrea-gke.yml | 6 ++++ build/yamls/antrea-ipsec.yml | 6 ++++ build/yamls/antrea.yml | 6 ++++ cmd/antrea-agent/agent.go | 2 +- .../controller/egress/egress_controller.go | 35 +++++++++++++++++-- .../serviceexternalip/controller.go | 2 +- pkg/agent/ipassigner/ip_assigner.go | 5 ++- pkg/agent/ipassigner/ip_assigner_linux.go | 16 ++++----- .../ipassigner/testing/mock_ipassigner.go | 7 ++-- test/e2e/egress_test.go | 6 ++++ .../agent/ip_assigner_linux_test.go | 4 +-- 14 files changed, 95 insertions(+), 18 deletions(-) diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index 7db11aebb8e..5b4f526af5f 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -219,3 +219,9 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index f33bf08c677..ec62cc0682b 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -6256,6 +6256,12 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index b01e8967873..ec1a33e04b7 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -6256,6 +6256,12 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 774d45b3570..48e5ce9fcf7 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -6256,6 +6256,12 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 4a1dd0423b8..6ef947ab862 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -6269,6 +6269,12 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 28a9c10267d..452ef359835 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -6256,6 +6256,12 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 44604492295..e41af823b0e 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -757,7 +757,7 @@ func run(o *Options) error { // Run them after AntreaProxy is ready. go networkPolicyController.Run(stopCh) if o.enableEgress { - go egressController.Run(stopCh) + go egressController.Run(k8sClient, stopCh) } var mcastController *multicast.Controller diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index 00d50d668af..db9bd499a4c 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -33,7 +33,10 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -49,6 +52,7 @@ import ( cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1b1 "antrea.io/antrea/pkg/apis/crd/v1beta1" clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned" + "antrea.io/antrea/pkg/client/clientset/versioned/scheme" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1beta1" crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1beta1" "antrea.io/antrea/pkg/controller/metrics" @@ -175,6 +179,8 @@ type EgressController struct { serviceCIDRUpdateRetryDelay time.Duration trafficShapingEnabled bool + + record record.EventRecorder } func NewEgressController( @@ -196,6 +202,13 @@ func NewEgressController( if trafficShapingEnabled && !openflow.OVSMetersAreSupported() { klog.Info("EgressTrafficShaping feature gate is enabled, but it is ignored because OVS meters are not supported.") } + + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder( + scheme.Scheme, + corev1.EventSource{Component: controllerName}, + ) + c := &EgressController{ ofClient: ofClient, routeClient: routeClient, @@ -220,6 +233,8 @@ func NewEgressController( serviceCIDRUpdateRetryDelay: 10 * time.Second, trafficShapingEnabled: openflow.OVSMetersAreSupported() && trafficShapingEnabled, + + record: recorder, } ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice) if err != nil { @@ -382,12 +397,23 @@ func (c *EgressController) onLocalIPUpdate(ip string, added bool) { // Run will create defaultWorkers workers (go routines) which will process the Egress events from the // workqueue. -func (c *EgressController) Run(stopCh <-chan struct{}) { +func (c *EgressController) Run(client kubernetes.Interface, stopCh <-chan struct{}) { defer c.queue.ShutDown() klog.Infof("Starting %s", controllerName) defer klog.Infof("Shutting down %s", controllerName) + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartStructuredLogging(0) + eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{ + Interface: client.CoreV1().Events(""), + }) + c.record = eventBroadcaster.NewRecorder( + scheme.Scheme, + corev1.EventSource{Component: controllerName}, + ) + defer eventBroadcaster.Shutdown() + go c.localIPDetector.Run(stopCh) go c.egressIPScheduler.Run(stopCh) go c.ipAssigner.Run(stopCh) @@ -848,14 +874,19 @@ func (c *EgressController) syncEgress(egressName string) error { // Ensure the Egress IP is assigned to the system. Force advertising the IP if it was previously assigned to // another Node in the Egress API. This could force refreshing other peers' neighbor cache when the Egress IP is // obtained by this Node and another Node at the same time in some situations, e.g. split brain. - if err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName); err != nil { + exists, err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName) + if err != nil { return err } + if exists { + c.record.Eventf(egress, corev1.EventTypeNormal, "IPAssigned", "Assigned Egress %s with IP %s on Node %s", egress.Name, desiredEgressIP, desiredNode) + } } else { // Unassign the Egress IP from the local Node if it was assigned by the agent. if err := c.ipAssigner.UnassignIP(desiredEgressIP); err != nil { return err } + c.record.Eventf(egress, corev1.EventTypeNormal, "IPUnassigned", "Unassigned Egress %s with IP %s", egress.Name, egress.Spec.EgressIP) } // Realize the latest EgressIP and get the desired mark. diff --git a/pkg/agent/controller/serviceexternalip/controller.go b/pkg/agent/controller/serviceexternalip/controller.go index 31d108eb8fb..05804ea2d53 100644 --- a/pkg/agent/controller/serviceexternalip/controller.go +++ b/pkg/agent/controller/serviceexternalip/controller.go @@ -393,7 +393,7 @@ func (c *ServiceExternalIPController) assignIP(ip string, service apimachineryty c.assignedIPsMutex.Lock() defer c.assignedIPsMutex.Unlock() if _, ok := c.assignedIPs[ip]; !ok { - if err := c.ipAssigner.AssignIP(ip, true); err != nil { + if _, err := c.ipAssigner.AssignIP(ip, true); err != nil { return err } c.assignedIPs[ip] = sets.New[string](service.String()) diff --git a/pkg/agent/ipassigner/ip_assigner.go b/pkg/agent/ipassigner/ip_assigner.go index 8bfc681fd7b..462bf34edb6 100644 --- a/pkg/agent/ipassigner/ip_assigner.go +++ b/pkg/agent/ipassigner/ip_assigner.go @@ -19,7 +19,10 @@ import "k8s.io/apimachinery/pkg/util/sets" // IPAssigner provides methods to assign or unassign IP. type IPAssigner interface { // AssignIP ensures the provided IP is assigned to the system. - AssignIP(ip string, forceAdvertise bool) error + // It returns True only in the case when there is no error and the IP provided + // is not assigned to the inetrface before the operation, in all other cases it + // return false. + AssignIP(ip string, forceAdvertise bool) (bool, error) // UnassignIP ensures the provided IP is not assigned to the system. UnassignIP(ip string) error // AssignedIPs return the IPs that are assigned to the system by this IPAssigner. diff --git a/pkg/agent/ipassigner/ip_assigner_linux.go b/pkg/agent/ipassigner/ip_assigner_linux.go index fa6946338a0..318ff4cea27 100644 --- a/pkg/agent/ipassigner/ip_assigner_linux.go +++ b/pkg/agent/ipassigner/ip_assigner_linux.go @@ -143,10 +143,10 @@ func (a *ipAssigner) loadIPAddresses() (sets.Set[string], error) { } // AssignIP ensures the provided IP is assigned to the dummy device and the ARP/NDP responders. -func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error { +func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) (bool, error) { parsedIP := net.ParseIP(ip) if parsedIP == nil { - return fmt.Errorf("invalid IP %s", ip) + return false, fmt.Errorf("invalid IP %s", ip) } a.mutex.Lock() defer a.mutex.Unlock() @@ -156,14 +156,14 @@ func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error { if forceAdvertise { a.advertise(parsedIP) } - return nil + return false, nil } if a.dummyDevice != nil { addr := util.NewIPNet(parsedIP) if err := netlink.AddrAdd(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil { if !errors.Is(err, unix.EEXIST) { - return fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err) + return false, fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err) } else { klog.InfoS("IP was already assigned to interface", "ip", parsedIP, "interface", a.dummyDevice.Attrs().Name) } @@ -174,18 +174,18 @@ func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error { if utilnet.IsIPv4(parsedIP) && a.arpResponder != nil { if err := a.arpResponder.AddIP(parsedIP); err != nil { - return fmt.Errorf("failed to assign IP %v to ARP responder: %v", ip, err) + return false, fmt.Errorf("failed to assign IP %v to ARP responder: %v", ip, err) } } if utilnet.IsIPv6(parsedIP) && a.ndpResponder != nil { if err := a.ndpResponder.AddIP(parsedIP); err != nil { - return fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err) + return false, fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err) } } // Always advertise the IP when the IP is newly assigned to this Node. a.advertise(parsedIP) a.assignedIPs.Insert(ip) - return nil + return true, nil } func (a *ipAssigner) advertise(ip net.IP) { @@ -213,7 +213,7 @@ func (a *ipAssigner) UnassignIP(ip string) error { if !a.assignedIPs.Has(ip) { klog.V(2).InfoS("The IP is not assigned", "ip", ip) - return nil + return fmt.Errorf("unassigned ip") } if a.dummyDevice != nil { diff --git a/pkg/agent/ipassigner/testing/mock_ipassigner.go b/pkg/agent/ipassigner/testing/mock_ipassigner.go index 57d42d7d8f8..3d4236ed887 100644 --- a/pkg/agent/ipassigner/testing/mock_ipassigner.go +++ b/pkg/agent/ipassigner/testing/mock_ipassigner.go @@ -54,11 +54,12 @@ func (m *MockIPAssigner) EXPECT() *MockIPAssignerMockRecorder { } // AssignIP mocks base method. -func (m *MockIPAssigner) AssignIP(arg0 string, arg1 bool) error { +func (m *MockIPAssigner) AssignIP(arg0 string, arg1 bool) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AssignIP", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 } // AssignIP indicates an expected call of AssignIP. diff --git a/test/e2e/egress_test.go b/test/e2e/egress_test.go index 230013ddae5..ddb5634948e 100644 --- a/test/e2e/egress_test.go +++ b/test/e2e/egress_test.go @@ -35,6 +35,7 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/apis/crd/v1beta1" + "antrea.io/antrea/pkg/client/clientset/versioned/scheme" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/util/k8s" ) @@ -406,6 +407,11 @@ func testEgressCRUD(t *testing.T, data *TestData) { exists, err := hasIP(data, egress.Status.EgressNode, egress.Spec.EgressIP) require.NoError(t, err, "Failed to check if IP exists on Node") assert.True(t, exists, "Didn't find desired IP on Node") + // Testing the events recorded during creation of an Egress resource. + expectedMessage := fmt.Sprintf("Assigned Egress %s with IP %s on Node %v", egress.Name, tt.expectedEgressIP, egress.Status.EgressNode) + events, err := data.clientset.CoreV1().Events("").Search(scheme.Scheme, egress) + require.NoError(t, err) + assert.Contains(t, events.Items[0].Message, expectedMessage) } checkEIPStatus := func(expectedUsed int) { diff --git a/test/integration/agent/ip_assigner_linux_test.go b/test/integration/agent/ip_assigner_linux_test.go index 77368e18130..90444a47897 100644 --- a/test/integration/agent/ip_assigner_linux_test.go +++ b/test/integration/agent/ip_assigner_linux_test.go @@ -40,7 +40,7 @@ func TestIPAssigner(t *testing.T) { require.NoError(t, err, "Failed to find the dummy device") defer netlink.LinkDel(dummyDevice) - err = ipAssigner.AssignIP("x", false) + _, err = ipAssigner.AssignIP("x", false) assert.Error(t, err, "Assigning an invalid IP should fail") ip1 := "10.10.10.10" @@ -49,7 +49,7 @@ func TestIPAssigner(t *testing.T) { desiredIPs := sets.New[string](ip1, ip2, ip3) for ip := range desiredIPs { - errAssign := ipAssigner.AssignIP(ip, false) + _, errAssign := ipAssigner.AssignIP(ip, false) cmd := exec.Command("ip", "addr") out, err := cmd.CombinedOutput() if err != nil {