Skip to content

Commit

Permalink
Unify some functions in pkg/agent/route
Browse files Browse the repository at this point in the history
This PR unifies the functions in route_linux.go and route_windows.go:
  - AddClusterIPRoute
  - AddLoadBalancer
  - DeleteLoadBalancer

Removed
  - Interface DeleteClusterIPRoute and corresponding implementation

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Jun 23, 2022
1 parent b8279c9 commit 3122cec
Show file tree
Hide file tree
Showing 11 changed files with 440 additions and 255 deletions.
22 changes: 1 addition & 21 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"antrea.io/antrea/pkg/ovs/ovsconfig"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/runtime"
)

const (
Expand Down Expand Up @@ -203,27 +202,8 @@ func (c *Controller) removeStaleGatewayRoutes() error {
desiredPodCIDRs = append(desiredPodCIDRs, podCIDRs...)
}

// TODO: This is not the best place to keep the ClusterIP Service routes.
desiredClusterIPSvcIPs := map[string]bool{}
if c.proxyAll && runtime.IsWindowsPlatform() {
// The route for virtual IP -> antrea-gw0 should be always kept.
desiredClusterIPSvcIPs[config.VirtualServiceIPv4.String()] = true

svcs, err := c.svcLister.List(labels.Everything())
for _, svc := range svcs {
for _, ip := range svc.Spec.ClusterIPs {
desiredClusterIPSvcIPs[ip] = true
}
}
if err != nil {
return fmt.Errorf("error when listing ClusterIP Service IPs: %v", err)
}
}

// routeClient will remove orphaned routes whose destinations are not in desiredPodCIDRs.
// If proxyAll enabled, it will also remove routes that are for Windows ClusterIP Services
// which no longer exist.
if err := c.routeClient.Reconcile(desiredPodCIDRs, desiredClusterIPSvcIPs); err != nil {
if err := c.routeClient.Reconcile(desiredPodCIDRs); err != nil {
return err
}
return nil
Expand Down
32 changes: 22 additions & 10 deletions pkg/agent/nodeportlocal/rules/netnat_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package rules

import (
"fmt"
"net"

"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/util"
binding "antrea.io/antrea/pkg/ovs/openflow"
)

// Use antrea-nat netnatstaticmapping rules as NPL implementation
Expand Down Expand Up @@ -68,13 +70,18 @@ func (nn *netnatRules) initRules() error {

// AddRule appends a NetNatStaticMapping rule.
func (nn *netnatRules) AddRule(nodePort int, podIP string, podPort int, protocol string) error {
nodePort16 := util.PortToUint16(nodePort)
podPort16 := util.PortToUint16(podPort)
podAddr := fmt.Sprintf("%s:%d", podIP, podPort16)
if err := util.ReplaceNetNatStaticMapping(antreaNatNPL, "0.0.0.0", nodePort16, podIP, podPort16, protocol); err != nil {
netNatStaticMapping := &util.NetNatStaticMapping{
Name: antreaNatNPL,
ExternalIP: net.ParseIP("0.0.0.0"),
ExternalPort: util.PortToUint16(nodePort),
InternalIP: net.ParseIP(podIP),
InternalPort: util.PortToUint16(podPort),
Protocol: binding.Protocol(protocol),
}
if err := util.ReplaceNetNatStaticMapping(netNatStaticMapping); err != nil {
return err
}
klog.InfoS("Successfully added NetNat rule", "podAddr", podAddr, "nodePort", nodePort16, "protocol", protocol)
klog.InfoS("Successfully added NetNatStaticMapping", "NetNatStaticMapping", netNatStaticMapping)
return nil
}

Expand All @@ -90,13 +97,18 @@ func (nn *netnatRules) AddAllRules(nplList []PodNodePort) error {

// DeleteRule deletes a specific NPL rule from NetNatStaticMapping table
func (nn *netnatRules) DeleteRule(nodePort int, podIP string, podPort int, protocol string) error {
nodePort16 := util.PortToUint16(nodePort)
podPort16 := util.PortToUint16(podPort)
podAddr := fmt.Sprintf("%s:%d", podIP, podPort16)
if err := util.RemoveNetNatStaticMappingByNPLTuples(antreaNatNPL, "0.0.0.0", nodePort16, podIP, podPort16, protocol); err != nil {
netNatStaticMapping := &util.NetNatStaticMapping{
Name: antreaNatNPL,
ExternalIP: net.ParseIP("0.0.0.0"),
ExternalPort: util.PortToUint16(nodePort),
InternalIP: net.ParseIP(podIP),
InternalPort: util.PortToUint16(podPort),
Protocol: binding.Protocol(protocol),
}
if err := util.RemoveNetNatStaticMappingByNPLTuples(netNatStaticMapping); err != nil {
return err
}
klog.InfoS("Successfully deleted NetNatStaticMapping rule", "podAddr", podAddr, "nodePort", nodePort16, "protocol", protocol)
klog.InfoS("Successfully deleted NetNatStaticMapping", "NetNatStaticMapping", netNatStaticMapping)
return nil
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,13 +590,6 @@ func (p *proxier) installServices() {
continue
}
}
// If previous Service which has ClusterIP should be removed, remove ClusterIP routes.
if svcInfo.ClusterIP() != nil {
if err := p.routeClient.DeleteClusterIPRoute(pSvcInfo.ClusterIP()); err != nil {
klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName)
continue
}
}
}
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/agent/route/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type Interface interface {
// It should be idempotent and can be safely called on every startup.
Initialize(nodeConfig *config.NodeConfig, done func()) error

// Reconcile should remove orphaned routes and related configuration based on the desired podCIDRs and Service IPs.
// Reconcile should remove orphaned routes and related configuration based on the desired podCIDRs.
// If IPv6 is enabled in the cluster, Reconcile should also remove the orphaned IPv6 neighbors.
Reconcile(podCIDRs []string, svcIPs map[string]bool) error
Reconcile(podCIDRs []string) error

// AddRoutes should add routes to the provided podCIDR.
// It should override the routes if they already exist, without error.
Expand Down Expand Up @@ -61,10 +61,6 @@ type Interface interface {
// AddClusterIPRoute adds route on K8s node for Service ClusterIP.
AddClusterIPRoute(svcIP net.IP) error

// DeleteClusterIPRoute deletes route for a Service IP when AntreaProxy is configured to handle
// ClusterIP Service traffic from host network.
DeleteClusterIPRoute(svcIP net.IP) error

// AddLoadBalancer adds configurations when a LoadBalancer Service is created.
AddLoadBalancer(externalIPs []string) error

Expand Down
82 changes: 41 additions & 41 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ var (
// globalVMAC is used in the IPv6 neighbor configuration to advertise ND solicitation for the IPv6 address of the
// host gateway interface on other Nodes.
globalVMAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff")
// IPTablesSyncInterval is exported so that sync interval can be configured for running integration test with
// SyncInterval is exported so that sync interval can be configured for running integration test with
// smaller values. It is meant to be used internally by Run.
IPTablesSyncInterval = 60 * time.Second
SyncInterval = 60 * time.Second
)

// Client takes care of routing container packets in host network, coordinating ip route, ip rule, iptables and ipset.
Expand Down Expand Up @@ -186,8 +186,8 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
// It will not return until stopCh is closed.
func (c *Client) Run(stopCh <-chan struct{}) {
<-c.iptablesInitialized
klog.Infof("Starting iptables sync, with sync interval %v", IPTablesSyncInterval)
wait.Until(c.syncIPInfra, IPTablesSyncInterval, stopCh)
klog.Infof("Starting iptables, ipset and route sync, with sync interval %v", SyncInterval)
wait.Until(c.syncIPInfra, SyncInterval, stopCh)
}

// syncIPInfra is idempotent and can be safely called on every sync operation.
Expand All @@ -201,13 +201,13 @@ func (c *Client) syncIPInfra() {
klog.Errorf("Failed to sync iptables: %v", err)
return
}
if err := c.syncRoutes(); err != nil {
if err := c.syncRoute(); err != nil {
klog.Errorf("Failed to sync routes: %v", err)
}
klog.V(3).Infof("Successfully synced node iptables and routes")
klog.V(3).Infof("Successfully synced Node iptables, ipset and route")
}

func (c *Client) syncRoutes() error {
func (c *Client) syncRoute() error {
routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL)
if err != nil {
return err
Expand Down Expand Up @@ -728,8 +728,8 @@ func (c *Client) initServiceIPRoutes() error {
}

// Reconcile removes orphaned podCIDRs from ipset and removes routes to orphaned podCIDRs
// based on the desired podCIDRs. svcIPs are used for Windows only.
func (c *Client) Reconcile(podCIDRs []string, svcIPs map[string]bool) error {
// based on the desired podCIDRs.
func (c *Client) Reconcile(podCIDRs []string) error {
desiredPodCIDRs := sets.NewString(podCIDRs...)
// Get the peer IPv6 gateways from pod CIDRs
desiredIPv6GWs := getIPv6Gateways(podCIDRs)
Expand Down Expand Up @@ -1004,10 +1004,6 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error {
return nil
}

func (c *Client) DeleteClusterIPRoute(svcIP net.IP) error {
return nil
}

// Join all words with spaces, terminate with newline and write to buf.
func writeLine(buf *bytes.Buffer, words ...string) {
// We avoid strings.Join for performance reasons.
Expand Down Expand Up @@ -1201,11 +1197,9 @@ func (c *Client) AddClusterIPRoute(svcIP net.IP) error {
linkIndex := c.nodeConfig.GatewayConfig.LinkIndex
scope := netlink.SCOPE_UNIVERSE
curClusterIPCIDR := c.clusterIPv4CIDR
mask := ipv4AddrLength
gw := config.VirtualServiceIPv4
if isIPv6 {
curClusterIPCIDR = c.clusterIPv6CIDR
mask = ipv6AddrLength
gw = config.VirtualServiceIPv6
}

Expand All @@ -1215,27 +1209,21 @@ func (c *Client) AddClusterIPRoute(svcIP net.IP) error {
return nil
}

var newClusterIPCIDR *net.IPNet
var err error
if curClusterIPCIDR != nil {
// If the route exists and its destination CIDR doesn't contain the ClusterIP, generate a new destination CIDR by
// enlarging the current destination CIDR with the ClusterIP.
if newClusterIPCIDR, err = util.ExtendCIDRWithIP(curClusterIPCIDR, svcIP); err != nil {
return fmt.Errorf("enlarge the destination CIDR with an error: %w", err)
}
} else {
// If the route doesn't exist, generate a new destination CIDR with the ClusterIP. Note that, this is the first
// ClusterIP since the route doesn't exist.
newClusterIPCIDR = &net.IPNet{IP: svcIP, Mask: net.CIDRMask(mask, mask)}
// Enlarge the ClusterIP CIDR with the ClusterIP. Note that, if the ClusterIP CIDR doesn't exist, just generate a CIDR
// with the ClusterIP.
newClusterIPCIDR, err := util.ExtendCIDRWithIP(curClusterIPCIDR, svcIP)
if err != nil {
return fmt.Errorf("enlarge the destination CIDR with an error: %w", err)
}

// Generate a route with the new destination CIDR and install it.
// Generate a route with the new ClusterIP CIDR and install it.
newClusterIPCIDRMask, _ := newClusterIPCIDR.Mask.Size()
route := generateRoute(newClusterIPCIDR.IP, newClusterIPCIDRMask, gw, linkIndex, scope)
if err = netlink.RouteReplace(route); err != nil {
return fmt.Errorf("failed to install new ClusterIP route: %w", err)
}
// Store the new destination CIDR.

// Store the new ClusterIP CIDR.
if isIPv6 {
c.clusterIPv6CIDR = route.Dst
} else {
Expand All @@ -1246,29 +1234,41 @@ func (c *Client) AddClusterIPRoute(svcIP net.IP) error {
// Collect stale routes.
var staleRoutes []*netlink.Route
if curClusterIPCIDR != nil {
// If current destination CIDR is not nil, the route with current destination CIDR should be uninstalled.
// If the current ClusterIP CIDR exists, the route with the current ClusterIP CIDR should be uninstalled since
// a new route with a newly calculated ClusterIP CIDR has been installed.
route.Dst = curClusterIPCIDR
staleRoutes = []*netlink.Route{route}
} else {
// If current destination CIDR is nil, which means that Antrea Agent has just started, then all existing routes
// whose destination CIDR contains the first ClusterIP should be uninstalled, except the newly installed route.
// Note that, there may be multiple stale routes prior to this commit. When upgrading, all stale routes will be
// collected. After this commit, there will be only one stale route after Antrea Agent started.
routes, err := c.listIPRoutesOnGW()
if err != nil {
return fmt.Errorf("error listing ip routes: %w", err)
}
// If current ClusterIP CIDR doesn't exist, which means that Antrea Agent has just started, and the route for
// ClusterIP generated in the last round should be deleted.
for i := 0; i < len(routes); i++ {
if routes[i].Gw.Equal(gw) && !routes[i].Dst.IP.Equal(svcIP) && routes[i].Dst.Contains(svcIP) {
// Skip the route whose destination CIDR only has one address.
ones, _ := routes[i].Dst.Mask.Size()
if ones == ipv4AddrLength || ones == ipv6AddrLength {
continue
}
// Find stale route.
if routes[i].Dst.Contains(svcIP) && routes[i].Gw.Equal(gw) {
staleRoutes = append(staleRoutes, &routes[i])
// TODO: add a break here. Some old versions of Antrea have issue https://github.com/antrea-io/antrea/issues/3131.
// There might be multiple stale routes and they should be removed too. Without the issue, there should
// be only one stale route, and a break should be added here to avoid unnecessary iteration.
}
}
}

// Remove stale routes.
for _, rt := range staleRoutes {
if err = netlink.RouteDel(rt); err != nil {
return fmt.Errorf("failed to uninstall stale ClusterIP route %s: %w", rt.String(), err)
if err.Error() == "no such process" {
klog.InfoS("Failed to delete stale ClusterIP route since the route doesn't exist", "route", route)
} else {
return fmt.Errorf("failed to delete routing entry for ClusterIP %s: %w", svcIP.String(), err)
}
}
klog.V(4).InfoS("Uninstalled stale ClusterIP route successfully", "stale route", rt)
}
Expand Down Expand Up @@ -1314,10 +1314,10 @@ func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error {

route := generateRoute(svcIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE)
if err := netlink.RouteReplace(route); err != nil {
return fmt.Errorf("failed to install routing entry for LoadBalancer ingress IP %s: %w", svcIP.String(), err)
return fmt.Errorf("failed to install routing entry for LoadBalancer ingress IP %s: %w", svcIPStr, err)
}
klog.V(4).InfoS("Added LoadBalancer ingress IP route", "route", route)
c.serviceRoutes.Store(svcIP.String(), route)
c.serviceRoutes.Store(svcIPStr, route)

return nil
}
Expand All @@ -1341,13 +1341,13 @@ func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error {
route := generateRoute(svcIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE)
if err := netlink.RouteDel(route); err != nil {
if err.Error() == "no such process" {
klog.InfoS("Failed to delete LoadBalancer ingress IP route since the route has been deleted", "route", route)
klog.InfoS("Failed to delete LoadBalancer ingress IP route since the route doesn't exist", "route", route)
} else {
return fmt.Errorf("failed to delete routing entry for LoadBalancer ingress IP %s: %w", svcIP.String(), err)
return fmt.Errorf("failed to delete routing entry for LoadBalancer ingress IP %s: %w", svcIPStr, err)
}
}
klog.V(4).InfoS("Deleted LoadBalancer ingress IP route", "route", route)
c.serviceRoutes.Delete(svcIP.String())
c.serviceRoutes.Delete(svcIPStr)

return nil
}
Expand Down
Loading

0 comments on commit 3122cec

Please sign in to comment.