Skip to content

Commit

Permalink
Improve Egress API visibility
Browse files Browse the repository at this point in the history
Record event when EgressIP is assigned to the Node interface.

Signed-off-by: Pulkit Jain <jainpu@vmware.com>
  • Loading branch information
Pulkit Jain committed Dec 22, 2023
1 parent d238ecd commit 8320b84
Show file tree
Hide file tree
Showing 14 changed files with 95 additions and 18 deletions.
6 changes: 6 additions & 0 deletions build/charts/antrea/templates/agent/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,9 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
6 changes: 6 additions & 0 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6256,6 +6256,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6256,6 +6256,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6256,6 +6256,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6269,6 +6269,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6256,6 +6256,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func run(o *Options) error {
// Run them after AntreaProxy is ready.
go networkPolicyController.Run(stopCh)
if o.enableEgress {
go egressController.Run(stopCh)
go egressController.Run(k8sClient, stopCh)
}

var mcastController *multicast.Controller
Expand Down
35 changes: 33 additions & 2 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
Expand All @@ -49,6 +52,7 @@ import (
cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
crdv1b1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned"
"antrea.io/antrea/pkg/client/clientset/versioned/scheme"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1beta1"
crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1beta1"
"antrea.io/antrea/pkg/controller/metrics"
Expand Down Expand Up @@ -175,6 +179,8 @@ type EgressController struct {
serviceCIDRUpdateRetryDelay time.Duration

trafficShapingEnabled bool

record record.EventRecorder
}

func NewEgressController(
Expand All @@ -196,6 +202,13 @@ func NewEgressController(
if trafficShapingEnabled && !openflow.OVSMetersAreSupported() {
klog.Info("EgressTrafficShaping feature gate is enabled, but it is ignored because OVS meters are not supported.")
}

eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
corev1.EventSource{Component: controllerName},
)

c := &EgressController{
ofClient: ofClient,
routeClient: routeClient,
Expand All @@ -220,6 +233,8 @@ func NewEgressController(
serviceCIDRUpdateRetryDelay: 10 * time.Second,

trafficShapingEnabled: openflow.OVSMetersAreSupported() && trafficShapingEnabled,

record: recorder,
}
ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice)
if err != nil {
Expand Down Expand Up @@ -382,12 +397,23 @@ func (c *EgressController) onLocalIPUpdate(ip string, added bool) {

// Run will create defaultWorkers workers (go routines) which will process the Egress events from the
// workqueue.
func (c *EgressController) Run(stopCh <-chan struct{}) {
func (c *EgressController) Run(client kubernetes.Interface, stopCh <-chan struct{}) {
defer c.queue.ShutDown()

klog.Infof("Starting %s", controllerName)
defer klog.Infof("Shutting down %s", controllerName)

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{
Interface: client.CoreV1().Events(""),
})
c.record = eventBroadcaster.NewRecorder(
scheme.Scheme,
corev1.EventSource{Component: controllerName},
)
defer eventBroadcaster.Shutdown()

go c.localIPDetector.Run(stopCh)
go c.egressIPScheduler.Run(stopCh)
go c.ipAssigner.Run(stopCh)
Expand Down Expand Up @@ -848,14 +874,19 @@ func (c *EgressController) syncEgress(egressName string) error {
// Ensure the Egress IP is assigned to the system. Force advertising the IP if it was previously assigned to
// another Node in the Egress API. This could force refreshing other peers' neighbor cache when the Egress IP is
// obtained by this Node and another Node at the same time in some situations, e.g. split brain.
if err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName); err != nil {
exists, err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName)
if err != nil {
return err
}
if exists {
c.record.Eventf(egress, corev1.EventTypeNormal, "IPAssigned", "Assigned Egress %s with IP %s on Node %s", egress.Name, desiredEgressIP, desiredNode)
}
} else {
// Unassign the Egress IP from the local Node if it was assigned by the agent.
if err := c.ipAssigner.UnassignIP(desiredEgressIP); err != nil {
return err
}
c.record.Eventf(egress, corev1.EventTypeNormal, "IPUnassigned", "Unassigned Egress %s with IP %s", egress.Name, egress.Spec.EgressIP)
}

// Realize the latest EgressIP and get the desired mark.
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/serviceexternalip/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *ServiceExternalIPController) assignIP(ip string, service apimachineryty
c.assignedIPsMutex.Lock()
defer c.assignedIPsMutex.Unlock()
if _, ok := c.assignedIPs[ip]; !ok {
if err := c.ipAssigner.AssignIP(ip, true); err != nil {
if _, err := c.ipAssigner.AssignIP(ip, true); err != nil {
return err
}
c.assignedIPs[ip] = sets.New[string](service.String())
Expand Down
5 changes: 4 additions & 1 deletion pkg/agent/ipassigner/ip_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import "k8s.io/apimachinery/pkg/util/sets"
// IPAssigner provides methods to assign or unassign IP.
type IPAssigner interface {
// AssignIP ensures the provided IP is assigned to the system.
AssignIP(ip string, forceAdvertise bool) error
// It returns True only in the case when there is no error and the IP provided
// is not assigned to the inetrface before the operation, in all other cases it
// return false.
AssignIP(ip string, forceAdvertise bool) (bool, error)
// UnassignIP ensures the provided IP is not assigned to the system.
UnassignIP(ip string) error
// AssignedIPs return the IPs that are assigned to the system by this IPAssigner.
Expand Down
16 changes: 8 additions & 8 deletions pkg/agent/ipassigner/ip_assigner_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ func (a *ipAssigner) loadIPAddresses() (sets.Set[string], error) {
}

// AssignIP ensures the provided IP is assigned to the dummy device and the ARP/NDP responders.
func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error {
func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) (bool, error) {
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
return fmt.Errorf("invalid IP %s", ip)
return false, fmt.Errorf("invalid IP %s", ip)
}
a.mutex.Lock()
defer a.mutex.Unlock()
Expand All @@ -156,14 +156,14 @@ func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error {
if forceAdvertise {
a.advertise(parsedIP)
}
return nil
return false, nil
}

if a.dummyDevice != nil {
addr := util.NewIPNet(parsedIP)
if err := netlink.AddrAdd(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil {
if !errors.Is(err, unix.EEXIST) {
return fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err)
return false, fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err)
} else {
klog.InfoS("IP was already assigned to interface", "ip", parsedIP, "interface", a.dummyDevice.Attrs().Name)
}
Expand All @@ -174,18 +174,18 @@ func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error {

if utilnet.IsIPv4(parsedIP) && a.arpResponder != nil {
if err := a.arpResponder.AddIP(parsedIP); err != nil {
return fmt.Errorf("failed to assign IP %v to ARP responder: %v", ip, err)
return false, fmt.Errorf("failed to assign IP %v to ARP responder: %v", ip, err)
}
}
if utilnet.IsIPv6(parsedIP) && a.ndpResponder != nil {
if err := a.ndpResponder.AddIP(parsedIP); err != nil {
return fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err)
return false, fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err)
}
}
// Always advertise the IP when the IP is newly assigned to this Node.
a.advertise(parsedIP)
a.assignedIPs.Insert(ip)
return nil
return true, nil
}

func (a *ipAssigner) advertise(ip net.IP) {
Expand Down Expand Up @@ -213,7 +213,7 @@ func (a *ipAssigner) UnassignIP(ip string) error {

if !a.assignedIPs.Has(ip) {
klog.V(2).InfoS("The IP is not assigned", "ip", ip)
return nil
return fmt.Errorf("unassigned ip")
}

if a.dummyDevice != nil {
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/ipassigner/testing/mock_ipassigner.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions test/e2e/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/apis/crd/v1beta1"
"antrea.io/antrea/pkg/client/clientset/versioned/scheme"
"antrea.io/antrea/pkg/features"
"antrea.io/antrea/pkg/util/k8s"
)
Expand Down Expand Up @@ -406,6 +407,11 @@ func testEgressCRUD(t *testing.T, data *TestData) {
exists, err := hasIP(data, egress.Status.EgressNode, egress.Spec.EgressIP)
require.NoError(t, err, "Failed to check if IP exists on Node")
assert.True(t, exists, "Didn't find desired IP on Node")
// Testing the events recorded during creation of an Egress resource.
expectedMessage := fmt.Sprintf("Assigned Egress %s with IP %s on Node %v", egress.Name, tt.expectedEgressIP, egress.Status.EgressNode)
events, err := data.clientset.CoreV1().Events("").Search(scheme.Scheme, egress)
require.NoError(t, err)
assert.Contains(t, events.Items[0].Message, expectedMessage)
}

checkEIPStatus := func(expectedUsed int) {
Expand Down
4 changes: 2 additions & 2 deletions test/integration/agent/ip_assigner_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestIPAssigner(t *testing.T) {
require.NoError(t, err, "Failed to find the dummy device")
defer netlink.LinkDel(dummyDevice)

err = ipAssigner.AssignIP("x", false)
_, err = ipAssigner.AssignIP("x", false)
assert.Error(t, err, "Assigning an invalid IP should fail")

ip1 := "10.10.10.10"
Expand All @@ -49,7 +49,7 @@ func TestIPAssigner(t *testing.T) {
desiredIPs := sets.New[string](ip1, ip2, ip3)

for ip := range desiredIPs {
errAssign := ipAssigner.AssignIP(ip, false)
_, errAssign := ipAssigner.AssignIP(ip, false)
cmd := exec.Command("ip", "addr")
out, err := cmd.CombinedOutput()
if err != nil {
Expand Down

0 comments on commit 8320b84

Please sign in to comment.