Skip to content

Commit

Permalink
Fix Egress IP not advertised in some cases (#5141)
Browse files Browse the repository at this point in the history
This patch fixes two cases, in which an Egress IP is not advertised:

- When userspace ARP responder is not running (arp_ignore is 0),
  assigning a new Egress IP to a Node would not advertise the IP.
- When an Egress IP is obtained by multiple Nodes in some situations,
  e.g. split brain, the eventual winner would not advertise the IP after
  they are back to normal, because the IP is already assigned to the
  Node.

The patch moves IP advertising out of responders. Responders will only
be responsible for answering neighbor queries. The IP assigner will
advertise an IP when the IP is newly assigned to the Node, or the IP's
Node recorded in Egress API is updated from another Node to this Node.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn authored Jun 19, 2023
1 parent 5deb5f9 commit 53354ae
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 178 deletions.
6 changes: 4 additions & 2 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,10 @@ func (c *EgressController) syncEgress(egressName string) error {
}

if desiredNode == c.nodeName {
// Ensure the Egress IP is assigned to the system.
if err := c.ipAssigner.AssignIP(desiredEgressIP); err != nil {
// Ensure the Egress IP is assigned to the system. Force advertising the IP if it was previously assigned to
// another Node in the Egress API. This could force refreshing other peers' neighbor cache when the Egress IP is
// obtained by this Node and another Node at the same time in some situations, e.g. split brain.
if err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName); err != nil {
return err
}
} else {
Expand Down
8 changes: 5 additions & 3 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,9 @@ func TestSyncEgress(t *testing.T) {
mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1))
mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1))
mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1))
mockIPAssigner.EXPECT().AssignIP(fakeLocalEgressIP2).Times(2)
mockIPAssigner.EXPECT().AssignIP(fakeLocalEgressIP2, true)
// forceAdvertise depends on how fast the Egress status update is reflected in the informer cache, which doesn't really matter.
mockIPAssigner.EXPECT().AssignIP(fakeLocalEgressIP2, gomock.Any())
mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP2), uint32(2))
mockOFClient.EXPECT().InstallPodSNATFlows(uint32(3), net.ParseIP(fakeLocalEgressIP2), uint32(2))
mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP2), uint32(2))
Expand Down Expand Up @@ -631,7 +633,7 @@ func TestSyncEgress(t *testing.T) {
},
},
expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) {
mockIPAssigner.EXPECT().AssignIP(fakeLocalEgressIP1)
mockIPAssigner.EXPECT().AssignIP(fakeLocalEgressIP1, true)
mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1))
mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1))
mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1))
Expand Down Expand Up @@ -673,7 +675,7 @@ func TestSyncEgress(t *testing.T) {
},
},
expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) {
mockIPAssigner.EXPECT().AssignIP(fakeLocalEgressIP1)
mockIPAssigner.EXPECT().AssignIP(fakeLocalEgressIP1, true)
mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1))
mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1))
mockOFClient.EXPECT().InstallPodSNATFlows(uint32(2), net.ParseIP(fakeLocalEgressIP1), uint32(1))
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/serviceexternalip/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *ServiceExternalIPController) assignIP(ip string, service apimachineryty
c.assignedIPsMutex.Lock()
defer c.assignedIPsMutex.Unlock()
if _, ok := c.assignedIPs[ip]; !ok {
if err := c.ipAssigner.AssignIP(ip); err != nil {
if err := c.ipAssigner.AssignIP(ip, true); err != nil {
return err
}
c.assignedIPs[ip] = sets.New[string](service.String())
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/controller/serviceexternalip/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestCreateService(t *testing.T) {
serviceToCreate: servicePolicyCluster,
healthyNodes: []string{fakeNode1, fakeNode2},
expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) {
mockIPAssigner.EXPECT().AssignIP(fakeServiceExternalIP1)
mockIPAssigner.EXPECT().AssignIP(fakeServiceExternalIP1, true)
},
expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{
keyFor(servicePolicyCluster): {
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestCreateService(t *testing.T) {
serviceToCreate: servicePolicyLocal,
healthyNodes: []string{fakeNode1, fakeNode2},
expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) {
mockIPAssigner.EXPECT().AssignIP(fakeServiceExternalIP1)
mockIPAssigner.EXPECT().AssignIP(fakeServiceExternalIP1, true)
},
expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{
keyFor(servicePolicyLocal): {
Expand Down Expand Up @@ -454,7 +454,7 @@ func TestUpdateService(t *testing.T) {
healthyNodes: []string{fakeNode1, fakeNode2},
expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) {
mockIPAssigner.EXPECT().UnassignIP(fakeServiceExternalIP1)
mockIPAssigner.EXPECT().AssignIP(fakeServiceExternalIP2)
mockIPAssigner.EXPECT().AssignIP(fakeServiceExternalIP2, true)
},
expectError: false,
},
Expand All @@ -472,7 +472,7 @@ func TestUpdateService(t *testing.T) {
},
healthyNodes: []string{fakeNode1, fakeNode2},
expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) {
mockIPAssigner.EXPECT().AssignIP(fakeServiceExternalIP2)
mockIPAssigner.EXPECT().AssignIP(fakeServiceExternalIP2, true)
},
expectError: false,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/ipassigner/ip_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import "k8s.io/apimachinery/pkg/util/sets"
// IPAssigner provides methods to assign or unassign IP.
type IPAssigner interface {
// AssignIP ensures the provided IP is assigned to the system.
AssignIP(ip string) error
AssignIP(ip string, forceAdvertise bool) error
// UnassignIP ensures the provided IP is not assigned to the system.
UnassignIP(ip string) error
// AssignedIPs return the IPs that are assigned to the system by this IPAssigner.
Expand Down
29 changes: 25 additions & 4 deletions pkg/agent/ipassigner/ip_assigner_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

"antrea.io/antrea/pkg/agent/ipassigner/responder"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/agent/util/arping"
"antrea.io/antrea/pkg/agent/util/ndp"
"antrea.io/antrea/pkg/agent/util/sysctl"
)

Expand Down Expand Up @@ -71,11 +73,11 @@ func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (IPAss
return nil, err
}
if dummyDeviceName == "" || arpIgnore > 0 {
arpResonder, err := responder.NewARPResponder(externalInterface)
arpResponder, err := responder.NewARPResponder(externalInterface)
if err != nil {
return nil, fmt.Errorf("failed to create ARP responder for link %s: %v", externalInterface.Name, err)
}
a.arpResponder = arpResonder
a.arpResponder = arpResponder
}
}
if ipv6 != nil {
Expand Down Expand Up @@ -141,7 +143,7 @@ func (a *ipAssigner) loadIPAddresses() (sets.Set[string], error) {
}

// AssignIP ensures the provided IP is assigned to the dummy device and the ARP/NDP responders.
func (a *ipAssigner) AssignIP(ip string) error {
func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error {
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
return fmt.Errorf("invalid IP %s", ip)
Expand All @@ -151,6 +153,9 @@ func (a *ipAssigner) AssignIP(ip string) error {

if a.assignedIPs.Has(ip) {
klog.V(2).InfoS("The IP is already assigned", "ip", ip)
if forceAdvertise {
a.advertise(parsedIP)
}
return nil
}

Expand All @@ -177,11 +182,26 @@ func (a *ipAssigner) AssignIP(ip string) error {
return fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err)
}
}

// Always advertise the IP when the IP is newly assigned to this Node.
a.advertise(parsedIP)
a.assignedIPs.Insert(ip)
return nil
}

func (a *ipAssigner) advertise(ip net.IP) {
if utilnet.IsIPv4(ip) {
klog.V(2).InfoS("Sending gratuitous ARP", "ip", ip)
if err := arping.GratuitousARPOverIface(ip, a.externalInterface); err != nil {
klog.ErrorS(err, "Failed to send gratuitous ARP", "ip", ip)
}
} else {
klog.V(2).InfoS("Sending neighbor advertisement", "ip", ip)
if err := ndp.NeighborAdvertisement(ip, a.externalInterface); err != nil {
klog.ErrorS(err, "Failed to send neighbor advertisement", "ip", ip)
}
}
}

// UnassignIP ensures the provided IP is not assigned to the dummy device.
func (a *ipAssigner) UnassignIP(ip string) error {
parsedIP := net.ParseIP(ip)
Expand Down Expand Up @@ -271,6 +291,7 @@ func (a *ipAssigner) InitIPs(ips sets.Set[string]) error {
if err != nil {
return err
}
a.advertise(ip)
}
a.assignedIPs = ips.Union(nil)
return nil
Expand Down
14 changes: 0 additions & 14 deletions pkg/agent/ipassigner/responder/arp_responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync"

"github.com/mdlayher/arp"
"github.com/mdlayher/ethernet"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
Expand All @@ -47,15 +46,6 @@ func NewARPResponder(iface *net.Interface) (*arpResponder, error) {
}, nil
}

// advertise sends an gratuitous ARP packet for the IP.
func (r *arpResponder) advertise(ip net.IP) error {
pkt, err := arp.NewPacket(arp.OperationRequest, r.iface.HardwareAddr, ip, ethernet.Broadcast, ip)
if err != nil {
return err
}
return r.conn.WriteTo(pkt, ethernet.Broadcast)
}

func (r *arpResponder) InterfaceName() string {
return r.iface.Name
}
Expand All @@ -66,10 +56,6 @@ func (r *arpResponder) AddIP(ip net.IP) error {
}
if r.addIP(ip) {
klog.InfoS("Assigned IP to ARP responder", "ip", ip, "interface", r.iface.Name)
err := r.advertise(ip)
if err != nil {
klog.ErrorS(err, "Failed to advertise", "ip", ip, "interface", r.iface.Name)
}
}
return nil
}
Expand Down
58 changes: 0 additions & 58 deletions pkg/agent/ipassigner/responder/arp_responder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,64 +77,6 @@ func newFakeNetworkInterface() *net.Interface {
}
}

func TestARPResponder_Advertise(t *testing.T) {
tests := []struct {
name string
iface *net.Interface
ip net.IP
expectError bool
expectedBytes []byte
}{
{
name: "GratuitousARP for IPv4",
iface: newFakeNetworkInterface(),
ip: net.ParseIP("192.168.10.1").To4(),
expectError: false,
expectedBytes: []byte{
// ethernet header (16 bytes)
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // 6 bytes: destination hardware address
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, // 6 bytes: source hardware address
0x08, 0x06, // 2 bytes: ethernet type
// arp payload (46 bytes)
0x00, 0x01, // 2 bytes: hardware type
0x08, 0x00, // 2 bytes: protocol type
0x06, // 1 byte : hardware address length
0x04, // 1 byte : protocol length
0x00, 0x01, // 2 bytes: operation
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, // 6 bytes: source hardware address
0xc0, 0xa8, 0x0a, 0x01, // 4 bytes: source protocol address
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // 6 bytes: target hardware address
0xc0, 0xa8, 0x0a, 0x01, // 4 bytes: target protocol address
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 18 bytes: padding
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conn := &fakePacketConn{
buffer: bytes.NewBuffer(nil),
addr: packet.Addr{
HardwareAddr: tt.iface.HardwareAddr,
},
}
fakeARPClient, err := newFakeARPClient(tt.iface, conn)
require.NoError(t, err)

r := arpResponder{
iface: tt.iface,
conn: fakeARPClient,
}
err = r.advertise(tt.ip)
if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.expectedBytes, conn.buffer.Bytes())
}
})
}
}

func TestARPResponder_HandleARPRequest(t *testing.T) {
tests := []struct {
name string
Expand Down
18 changes: 0 additions & 18 deletions pkg/agent/ipassigner/responder/ndp_responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,6 @@ func (r *ndpResponder) InterfaceName() string {
return r.iface.Name
}

// advertise sends Neighbor Advertisement for the IP.
func (r *ndpResponder) advertise(ip net.IP) error {
na := &ndp.NeighborAdvertisement{
Override: true,
TargetAddress: ip,
Options: []ndp.Option{
&ndp.LinkLayerAddress{
Direction: ndp.Target,
Addr: r.iface.HardwareAddr,
},
},
}
return r.conn.WriteTo(na, nil, net.IPv6linklocalallnodes)
}

func (r *ndpResponder) handleNeighborSolicitation() error {
pkt, _, srcIP, err := r.conn.ReadFrom()
if err != nil {
Expand Down Expand Up @@ -173,9 +158,6 @@ func (r *ndpResponder) AddIP(ip net.IP) error {
}(); err != nil {
return err
}
if err := r.advertise(ip); err != nil {
klog.ErrorS(err, "Failed to advertise", "ip", ip, "interface", r.iface.Name)
}
return nil
}

Expand Down
54 changes: 0 additions & 54 deletions pkg/agent/ipassigner/responder/ndp_responder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,60 +52,6 @@ func (c *fakeNDPConn) LeaveGroup(ip net.IP) error {
return c.leaveGroup(ip)
}

func TestNDPResponder_Advertise(t *testing.T) {
buffer := bytes.NewBuffer(nil)
fakeConn := &fakeNDPConn{
writeTo: func(msg ndp.Message, _ *ipv6.ControlMessage, _ net.IP) error {
bs, err := ndp.MarshalMessage(msg)
assert.NoError(t, err)
buffer.Write(bs)
return nil
},
}
responder := &ndpResponder{
iface: newFakeNetworkInterface(),
conn: fakeConn,
}
err := responder.advertise(net.ParseIP("fe80::250:56ff:fea7:e29d"))
assert.NoError(t, err)
// Neighbor Advertisement Message Format - RFC 4861 Section 4.4.
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Type | Code | Checksum |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |R|S|O| Reserved |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | |
// + +
// | |
// + Target Address +
// | |
// + +
// | |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Options ...
// +-+-+-+-+-+-+-+-+-+-+-+-
//
// Options formats - Source/Target Link-layer Address. RFC 4861 Section 4.6.1.
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Type | Length | Link-Layer Address ...
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
expectedBytes := []byte{
0x88, // type - 136 for Neighbor Advertisement
0x00, // code
0x00, 0x00, // checksum
0x20, 0x00, 0x00, 0x00, // flags and reserved bits. Override bit is set.
0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x50, 0x56, 0xff, 0xfe, 0xa7, 0xe2, 0x9d, // IPv6 address
0x02, // option - 2 for Target Link-layer Address
0x01, // length (units of 8 octets including type and length fields)
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, // hardware address
}
assert.Equal(t, expectedBytes, buffer.Bytes())
}

func TestNDPResponder_handleNeighborSolicitation(t *testing.T) {
tests := []struct {
name string
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/ipassigner/testing/mock_ipassigner.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 53354ae

Please sign in to comment.