From 3122cec6e3ac620475b5e18aa86253aa969cf134 Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Mon, 13 Jun 2022 12:36:36 +0800 Subject: [PATCH] Unify some functions in pkg/agent/route This PR unifies the functions in route_linux.go and route_windows.go: - AddClusterIPRoute - AddLoadBalancer - DeleteLoadBalancer Removed - Interface DeleteClusterIPRoute and corresponding implementation Signed-off-by: Hongliang Liu --- .../noderoute/node_route_controller.go | 22 +- pkg/agent/nodeportlocal/rules/netnat_rule.go | 32 +- pkg/agent/proxy/proxier.go | 7 - pkg/agent/route/interfaces.go | 8 +- pkg/agent/route/route_linux.go | 82 ++-- pkg/agent/route/route_windows.go | 412 +++++++++++++----- pkg/agent/route/route_windows_test.go | 31 +- pkg/agent/route/testing/mock_route.go | 22 +- pkg/agent/util/net.go | 3 + pkg/agent/util/net_windows.go | 58 ++- test/integration/agent/route_test.go | 18 +- 11 files changed, 440 insertions(+), 255 deletions(-) diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 61105539235..d66330cb2c2 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -42,7 +42,6 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" - "antrea.io/antrea/pkg/util/runtime" ) const ( @@ -203,27 +202,8 @@ func (c *Controller) removeStaleGatewayRoutes() error { desiredPodCIDRs = append(desiredPodCIDRs, podCIDRs...) } - // TODO: This is not the best place to keep the ClusterIP Service routes. - desiredClusterIPSvcIPs := map[string]bool{} - if c.proxyAll && runtime.IsWindowsPlatform() { - // The route for virtual IP -> antrea-gw0 should be always kept. - desiredClusterIPSvcIPs[config.VirtualServiceIPv4.String()] = true - - svcs, err := c.svcLister.List(labels.Everything()) - for _, svc := range svcs { - for _, ip := range svc.Spec.ClusterIPs { - desiredClusterIPSvcIPs[ip] = true - } - } - if err != nil { - return fmt.Errorf("error when listing ClusterIP Service IPs: %v", err) - } - } - // routeClient will remove orphaned routes whose destinations are not in desiredPodCIDRs. - // If proxyAll enabled, it will also remove routes that are for Windows ClusterIP Services - // which no longer exist. - if err := c.routeClient.Reconcile(desiredPodCIDRs, desiredClusterIPSvcIPs); err != nil { + if err := c.routeClient.Reconcile(desiredPodCIDRs); err != nil { return err } return nil diff --git a/pkg/agent/nodeportlocal/rules/netnat_rule.go b/pkg/agent/nodeportlocal/rules/netnat_rule.go index e5c36effffc..6f7ee202f7b 100644 --- a/pkg/agent/nodeportlocal/rules/netnat_rule.go +++ b/pkg/agent/nodeportlocal/rules/netnat_rule.go @@ -19,11 +19,13 @@ package rules import ( "fmt" + "net" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/agent/util" + binding "antrea.io/antrea/pkg/ovs/openflow" ) // Use antrea-nat netnatstaticmapping rules as NPL implementation @@ -68,13 +70,18 @@ func (nn *netnatRules) initRules() error { // AddRule appends a NetNatStaticMapping rule. func (nn *netnatRules) AddRule(nodePort int, podIP string, podPort int, protocol string) error { - nodePort16 := util.PortToUint16(nodePort) - podPort16 := util.PortToUint16(podPort) - podAddr := fmt.Sprintf("%s:%d", podIP, podPort16) - if err := util.ReplaceNetNatStaticMapping(antreaNatNPL, "0.0.0.0", nodePort16, podIP, podPort16, protocol); err != nil { + netNatStaticMapping := &util.NetNatStaticMapping{ + Name: antreaNatNPL, + ExternalIP: net.ParseIP("0.0.0.0"), + ExternalPort: util.PortToUint16(nodePort), + InternalIP: net.ParseIP(podIP), + InternalPort: util.PortToUint16(podPort), + Protocol: binding.Protocol(protocol), + } + if err := util.ReplaceNetNatStaticMapping(netNatStaticMapping); err != nil { return err } - klog.InfoS("Successfully added NetNat rule", "podAddr", podAddr, "nodePort", nodePort16, "protocol", protocol) + klog.InfoS("Successfully added NetNatStaticMapping", "NetNatStaticMapping", netNatStaticMapping) return nil } @@ -90,13 +97,18 @@ func (nn *netnatRules) AddAllRules(nplList []PodNodePort) error { // DeleteRule deletes a specific NPL rule from NetNatStaticMapping table func (nn *netnatRules) DeleteRule(nodePort int, podIP string, podPort int, protocol string) error { - nodePort16 := util.PortToUint16(nodePort) - podPort16 := util.PortToUint16(podPort) - podAddr := fmt.Sprintf("%s:%d", podIP, podPort16) - if err := util.RemoveNetNatStaticMappingByNPLTuples(antreaNatNPL, "0.0.0.0", nodePort16, podIP, podPort16, protocol); err != nil { + netNatStaticMapping := &util.NetNatStaticMapping{ + Name: antreaNatNPL, + ExternalIP: net.ParseIP("0.0.0.0"), + ExternalPort: util.PortToUint16(nodePort), + InternalIP: net.ParseIP(podIP), + InternalPort: util.PortToUint16(podPort), + Protocol: binding.Protocol(protocol), + } + if err := util.RemoveNetNatStaticMappingByNPLTuples(netNatStaticMapping); err != nil { return err } - klog.InfoS("Successfully deleted NetNatStaticMapping rule", "podAddr", podAddr, "nodePort", nodePort16, "protocol", protocol) + klog.InfoS("Successfully deleted NetNatStaticMapping", "NetNatStaticMapping", netNatStaticMapping) return nil } diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index d891f03570f..53be1c56480 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -590,13 +590,6 @@ func (p *proxier) installServices() { continue } } - // If previous Service which has ClusterIP should be removed, remove ClusterIP routes. - if svcInfo.ClusterIP() != nil { - if err := p.routeClient.DeleteClusterIPRoute(pSvcInfo.ClusterIP()); err != nil { - klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName) - continue - } - } } } diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index f0e1fc04a57..db90594222e 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -27,9 +27,9 @@ type Interface interface { // It should be idempotent and can be safely called on every startup. Initialize(nodeConfig *config.NodeConfig, done func()) error - // Reconcile should remove orphaned routes and related configuration based on the desired podCIDRs and Service IPs. + // Reconcile should remove orphaned routes and related configuration based on the desired podCIDRs. // If IPv6 is enabled in the cluster, Reconcile should also remove the orphaned IPv6 neighbors. - Reconcile(podCIDRs []string, svcIPs map[string]bool) error + Reconcile(podCIDRs []string) error // AddRoutes should add routes to the provided podCIDR. // It should override the routes if they already exist, without error. @@ -61,10 +61,6 @@ type Interface interface { // AddClusterIPRoute adds route on K8s node for Service ClusterIP. AddClusterIPRoute(svcIP net.IP) error - // DeleteClusterIPRoute deletes route for a Service IP when AntreaProxy is configured to handle - // ClusterIP Service traffic from host network. - DeleteClusterIPRoute(svcIP net.IP) error - // AddLoadBalancer adds configurations when a LoadBalancer Service is created. AddLoadBalancer(externalIPs []string) error diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 8ab871534c1..d119c76720a 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -80,9 +80,9 @@ var ( // globalVMAC is used in the IPv6 neighbor configuration to advertise ND solicitation for the IPv6 address of the // host gateway interface on other Nodes. globalVMAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff") - // IPTablesSyncInterval is exported so that sync interval can be configured for running integration test with + // SyncInterval is exported so that sync interval can be configured for running integration test with // smaller values. It is meant to be used internally by Run. - IPTablesSyncInterval = 60 * time.Second + SyncInterval = 60 * time.Second ) // Client takes care of routing container packets in host network, coordinating ip route, ip rule, iptables and ipset. @@ -186,8 +186,8 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { // It will not return until stopCh is closed. func (c *Client) Run(stopCh <-chan struct{}) { <-c.iptablesInitialized - klog.Infof("Starting iptables sync, with sync interval %v", IPTablesSyncInterval) - wait.Until(c.syncIPInfra, IPTablesSyncInterval, stopCh) + klog.Infof("Starting iptables, ipset and route sync, with sync interval %v", SyncInterval) + wait.Until(c.syncIPInfra, SyncInterval, stopCh) } // syncIPInfra is idempotent and can be safely called on every sync operation. @@ -201,13 +201,13 @@ func (c *Client) syncIPInfra() { klog.Errorf("Failed to sync iptables: %v", err) return } - if err := c.syncRoutes(); err != nil { + if err := c.syncRoute(); err != nil { klog.Errorf("Failed to sync routes: %v", err) } - klog.V(3).Infof("Successfully synced node iptables and routes") + klog.V(3).Infof("Successfully synced Node iptables, ipset and route") } -func (c *Client) syncRoutes() error { +func (c *Client) syncRoute() error { routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL) if err != nil { return err @@ -728,8 +728,8 @@ func (c *Client) initServiceIPRoutes() error { } // Reconcile removes orphaned podCIDRs from ipset and removes routes to orphaned podCIDRs -// based on the desired podCIDRs. svcIPs are used for Windows only. -func (c *Client) Reconcile(podCIDRs []string, svcIPs map[string]bool) error { +// based on the desired podCIDRs. +func (c *Client) Reconcile(podCIDRs []string) error { desiredPodCIDRs := sets.NewString(podCIDRs...) // Get the peer IPv6 gateways from pod CIDRs desiredIPv6GWs := getIPv6Gateways(podCIDRs) @@ -1004,10 +1004,6 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error { return nil } -func (c *Client) DeleteClusterIPRoute(svcIP net.IP) error { - return nil -} - // Join all words with spaces, terminate with newline and write to buf. func writeLine(buf *bytes.Buffer, words ...string) { // We avoid strings.Join for performance reasons. @@ -1201,11 +1197,9 @@ func (c *Client) AddClusterIPRoute(svcIP net.IP) error { linkIndex := c.nodeConfig.GatewayConfig.LinkIndex scope := netlink.SCOPE_UNIVERSE curClusterIPCIDR := c.clusterIPv4CIDR - mask := ipv4AddrLength gw := config.VirtualServiceIPv4 if isIPv6 { curClusterIPCIDR = c.clusterIPv6CIDR - mask = ipv6AddrLength gw = config.VirtualServiceIPv6 } @@ -1215,27 +1209,21 @@ func (c *Client) AddClusterIPRoute(svcIP net.IP) error { return nil } - var newClusterIPCIDR *net.IPNet - var err error - if curClusterIPCIDR != nil { - // If the route exists and its destination CIDR doesn't contain the ClusterIP, generate a new destination CIDR by - // enlarging the current destination CIDR with the ClusterIP. - if newClusterIPCIDR, err = util.ExtendCIDRWithIP(curClusterIPCIDR, svcIP); err != nil { - return fmt.Errorf("enlarge the destination CIDR with an error: %w", err) - } - } else { - // If the route doesn't exist, generate a new destination CIDR with the ClusterIP. Note that, this is the first - // ClusterIP since the route doesn't exist. - newClusterIPCIDR = &net.IPNet{IP: svcIP, Mask: net.CIDRMask(mask, mask)} + // Enlarge the ClusterIP CIDR with the ClusterIP. Note that, if the ClusterIP CIDR doesn't exist, just generate a CIDR + // with the ClusterIP. + newClusterIPCIDR, err := util.ExtendCIDRWithIP(curClusterIPCIDR, svcIP) + if err != nil { + return fmt.Errorf("enlarge the destination CIDR with an error: %w", err) } - // Generate a route with the new destination CIDR and install it. + // Generate a route with the new ClusterIP CIDR and install it. newClusterIPCIDRMask, _ := newClusterIPCIDR.Mask.Size() route := generateRoute(newClusterIPCIDR.IP, newClusterIPCIDRMask, gw, linkIndex, scope) if err = netlink.RouteReplace(route); err != nil { return fmt.Errorf("failed to install new ClusterIP route: %w", err) } - // Store the new destination CIDR. + + // Store the new ClusterIP CIDR. if isIPv6 { c.clusterIPv6CIDR = route.Dst } else { @@ -1246,21 +1234,29 @@ func (c *Client) AddClusterIPRoute(svcIP net.IP) error { // Collect stale routes. var staleRoutes []*netlink.Route if curClusterIPCIDR != nil { - // If current destination CIDR is not nil, the route with current destination CIDR should be uninstalled. + // If the current ClusterIP CIDR exists, the route with the current ClusterIP CIDR should be uninstalled since + // a new route with a newly calculated ClusterIP CIDR has been installed. route.Dst = curClusterIPCIDR staleRoutes = []*netlink.Route{route} } else { - // If current destination CIDR is nil, which means that Antrea Agent has just started, then all existing routes - // whose destination CIDR contains the first ClusterIP should be uninstalled, except the newly installed route. - // Note that, there may be multiple stale routes prior to this commit. When upgrading, all stale routes will be - // collected. After this commit, there will be only one stale route after Antrea Agent started. routes, err := c.listIPRoutesOnGW() if err != nil { return fmt.Errorf("error listing ip routes: %w", err) } + // If current ClusterIP CIDR doesn't exist, which means that Antrea Agent has just started, and the route for + // ClusterIP generated in the last round should be deleted. for i := 0; i < len(routes); i++ { - if routes[i].Gw.Equal(gw) && !routes[i].Dst.IP.Equal(svcIP) && routes[i].Dst.Contains(svcIP) { + // Skip the route whose destination CIDR only has one address. + ones, _ := routes[i].Dst.Mask.Size() + if ones == ipv4AddrLength || ones == ipv6AddrLength { + continue + } + // Find stale route. + if routes[i].Dst.Contains(svcIP) && routes[i].Gw.Equal(gw) { staleRoutes = append(staleRoutes, &routes[i]) + // TODO: add a break here. Some old versions of Antrea have issue https://github.com/antrea-io/antrea/issues/3131. + // There might be multiple stale routes and they should be removed too. Without the issue, there should + // be only one stale route, and a break should be added here to avoid unnecessary iteration. } } } @@ -1268,7 +1264,11 @@ func (c *Client) AddClusterIPRoute(svcIP net.IP) error { // Remove stale routes. for _, rt := range staleRoutes { if err = netlink.RouteDel(rt); err != nil { - return fmt.Errorf("failed to uninstall stale ClusterIP route %s: %w", rt.String(), err) + if err.Error() == "no such process" { + klog.InfoS("Failed to delete stale ClusterIP route since the route doesn't exist", "route", route) + } else { + return fmt.Errorf("failed to delete routing entry for ClusterIP %s: %w", svcIP.String(), err) + } } klog.V(4).InfoS("Uninstalled stale ClusterIP route successfully", "stale route", rt) } @@ -1314,10 +1314,10 @@ func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error { route := generateRoute(svcIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE) if err := netlink.RouteReplace(route); err != nil { - return fmt.Errorf("failed to install routing entry for LoadBalancer ingress IP %s: %w", svcIP.String(), err) + return fmt.Errorf("failed to install routing entry for LoadBalancer ingress IP %s: %w", svcIPStr, err) } klog.V(4).InfoS("Added LoadBalancer ingress IP route", "route", route) - c.serviceRoutes.Store(svcIP.String(), route) + c.serviceRoutes.Store(svcIPStr, route) return nil } @@ -1341,13 +1341,13 @@ func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error { route := generateRoute(svcIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE) if err := netlink.RouteDel(route); err != nil { if err.Error() == "no such process" { - klog.InfoS("Failed to delete LoadBalancer ingress IP route since the route has been deleted", "route", route) + klog.InfoS("Failed to delete LoadBalancer ingress IP route since the route doesn't exist", "route", route) } else { - return fmt.Errorf("failed to delete routing entry for LoadBalancer ingress IP %s: %w", svcIP.String(), err) + return fmt.Errorf("failed to delete routing entry for LoadBalancer ingress IP %s: %w", svcIPStr, err) } } klog.V(4).InfoS("Deleted LoadBalancer ingress IP route", "route", route) - c.serviceRoutes.Delete(svcIP.String()) + c.serviceRoutes.Delete(svcIPStr) return nil } diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 9d2d250e44a..3b7d05f0bcd 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -21,9 +21,13 @@ import ( "errors" "fmt" "net" + "strconv" + "strings" "sync" + "time" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" @@ -39,6 +43,8 @@ const ( outboundFirewallRuleName = "Antrea: accept packets to local Pods" antreaNatNodePort = "antrea-nat-nodeport" + + ipv4AddrLength = 32 ) var ( @@ -46,23 +52,29 @@ var ( virtualServiceIPv4Net = util.NewIPNet(config.VirtualServiceIPv4) virtualNodePortDNATIPv4Net = util.NewIPNet(config.VirtualNodePortDNATIPv4) PodCIDRIPv4 *net.IPNet + syncInterval = 60 * time.Second ) type Client struct { - nodeConfig *config.NodeConfig - networkConfig *config.NetworkConfig - hostRoutes *sync.Map - fwClient *winfirewall.Client - bridgeInfIndex int - noSNAT bool - proxyAll bool + nodeConfig *config.NodeConfig + networkConfig *config.NetworkConfig + nodeRoutes *sync.Map + serviceRoutes *sync.Map + netNat *sync.Map + fwClient *winfirewall.Client + bridgeInfIndex int + noSNAT bool + proxyAll bool + clusterIPv4CIDR *net.IPNet } // NewClient returns a route client. func NewClient(networkConfig *config.NetworkConfig, noSNAT, proxyAll, connectUplinkToBridge, multicastEnabled bool) (*Client, error) { return &Client{ networkConfig: networkConfig, - hostRoutes: &sync.Map{}, + nodeRoutes: &sync.Map{}, + serviceRoutes: &sync.Map{}, + netNat: &sync.Map{}, fwClient: winfirewall.NewClient(), noSNAT: noSNAT, proxyAll: proxyAll, @@ -102,6 +114,10 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { if err := c.initServiceIPRoutes(); err != nil { return fmt.Errorf("failed to initialize Service IP routes: %v", err) } + // For NodePort Service, a new NetNat for NetNatStaticMapping is needed. + if err := util.NewNetNat(antreaNatNodePort, virtualNodePortDNATIPv4Net); err != nil { + return err + } } done() @@ -113,6 +129,9 @@ func (c *Client) initServiceIPRoutes() error { if err := c.addVirtualServiceIPRoute(false); err != nil { return err } + if err := c.addVirtualNodePortDNATIPRoute(false); err != nil { + return err + } } if c.networkConfig.IPv6Enabled { return fmt.Errorf("IPv6 is not supported on Windows") @@ -122,19 +141,19 @@ func (c *Client) initServiceIPRoutes() error { // Reconcile removes the orphaned routes and related configuration based on the desired podCIDRs and Service IPs. Only // the route entries on the host gateway interface are stored in the cache. -func (c *Client) Reconcile(podCIDRs []string, svcIPs map[string]bool) error { +func (c *Client) Reconcile(podCIDRs []string) error { desiredPodCIDRs := sets.NewString(podCIDRs...) - routes, err := c.listRoutes() + routes, err := c.listIPRoutesOnGW() if err != nil { return err } for dst, rt := range routes { if desiredPodCIDRs.Has(dst) { - c.hostRoutes.Store(dst, rt) + c.nodeRoutes.Store(dst, rt) continue } - if _, ok := svcIPs[dst]; ok { - c.hostRoutes.Store(dst, rt) + // Don't delete the routes which are added by AntreaProxy when proxyAll is enabled. + if c.proxyAll && c.isServiceRoute(rt) { continue } err := util.RemoveNetRoute(rt) @@ -148,7 +167,7 @@ func (c *Client) Reconcile(podCIDRs []string, svcIPs map[string]bool) error { // AddRoutes adds routes to the provided podCIDR. // It overrides the routes if they already exist, without error. func (c *Client) AddRoutes(podCIDR *net.IPNet, nodeName string, peerNodeIP, peerGwIP net.IP) error { - obj, found := c.hostRoutes.Load(podCIDR.String()) + obj, found := c.nodeRoutes.Load(podCIDR.String()) route := &util.Route{ DestinationSubnet: podCIDR, RouteMetric: util.MetricDefault, @@ -186,7 +205,7 @@ func (c *Client) AddRoutes(podCIDR *net.IPNet, nodeName string, peerNodeIP, peer return err } - c.hostRoutes.Store(podCIDR.String(), route) + c.nodeRoutes.Store(podCIDR.String(), route) klog.V(2).Infof("Added route with destination %s via %s on host gateway on %s (%s)", podCIDR.String(), peerGwIP.String(), nodeName, peerNodeIP) return nil } @@ -194,7 +213,7 @@ func (c *Client) AddRoutes(podCIDR *net.IPNet, nodeName string, peerNodeIP, peer // DeleteRoutes deletes routes to the provided podCIDR. // It does nothing if the routes don't exist, without error. func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error { - obj, found := c.hostRoutes.Load(podCIDR.String()) + obj, found := c.nodeRoutes.Load(podCIDR.String()) if !found { klog.V(2).Infof("Route with destination %s not exists", podCIDR.String()) return nil @@ -204,7 +223,7 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error { if err := util.RemoveNetRoute(rt); err != nil { return err } - c.hostRoutes.Delete(podCIDR.String()) + c.nodeRoutes.Delete(podCIDR.String()) klog.V(2).Infof("Deleted route with destination %s from host gateway", podCIDR.String()) return nil } @@ -213,109 +232,120 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error { // Service traffic from host network to OVS via antrea-gw0. func (c *Client) addVirtualServiceIPRoute(isIPv6 bool) error { linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + svcIP := config.VirtualServiceIPv4 - // This route is for 2 purposes: - // - If for each ClusterIP Service, a route is installed to direct traffic to antrea-gw0, there will be too many - // neighbor cache entries. While with one virtual IP to antrea-gw0 and ClusterIP Services to the virtual IP, - // there will be only one neighbor cache entry. - // - For ClusterIP Service requests from host, reply traffic needs a route entry to route packet to VirtualServiceIPv4 - // via antrea-gw0. If the NextHop of a route is antrea-gw0, then set it as 0.0.0.0. As a result, on-link/0.0.0.0 - // is in PersistentStore. - // - For NodePort Service, it is the same. - vRoute := &util.Route{ - LinkIndex: linkIndex, - DestinationSubnet: virtualServiceIPv4Net, - GatewayAddress: net.IPv4zero, - RouteMetric: util.MetricHigh, - } - if err := util.ReplaceNetRoute(vRoute); err != nil { - return err + neigh := generateNeigh(svcIP, linkIndex) + if err := util.ReplaceNetNeighbor(neigh); err != nil { + return fmt.Errorf("failed to add new IP neighbour for %s: %w", svcIP, err) } - klog.InfoS("Added virtual Service IP route", "route", vRoute) + klog.InfoS("Added virtual Service IP neighbor", "neighbor", neigh) - // Service replies will be sent to OVS bridge via openflow.GlobalVirtualMAC. This NetNeighbor is for - // creating a neighbor cache entry to config.VirtualServiceIPv4. - vNeighbor := &util.Neighbor{ - LinkIndex: linkIndex, - IPAddress: config.VirtualServiceIPv4, - LinkLayerAddress: openflow.GlobalVirtualMAC, - State: "Permanent", - } - if err := util.ReplaceNetNeighbor(vNeighbor); err != nil { - return err + route := generateRoute(virtualServiceIPv4Net, net.IPv4zero, linkIndex, util.MetricHigh) + if err := util.ReplaceNetRoute(route); err != nil { + return fmt.Errorf("failed to install route for virtual Service IP %s: %w", svcIP.String(), err) } - klog.InfoS("Added virtual Service IP neighbor", "neighbor", vNeighbor) + c.serviceRoutes.Store(svcIP.String(), route) + klog.InfoS("Added virtual Service IP route", "route", route) - if err := c.addServiceRoute(config.VirtualNodePortDNATIPv4); err != nil { - return err + return nil +} + +func (c *Client) AddClusterIPRoute(svcIP net.IP) error { + linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + gw := config.VirtualServiceIPv4 + metric := util.MetricHigh + curClusterIPCIDR := c.clusterIPv4CIDR + + // If the route exists and its destination CIDR contains the ClusterIP, there is no need to update the route. + if curClusterIPCIDR != nil && curClusterIPCIDR.Contains(svcIP) { + klog.V(4).InfoS("Route with current ClusterIP CIDR can route the ClusterIP to Antrea gateway", "ClusterIP CIDR", curClusterIPCIDR, "ClusterIP", svcIP) + return nil } - // For NodePort Service, a new NetNat for NetNatStaticMapping is needed. - err := util.NewNetNat(antreaNatNodePort, virtualNodePortDNATIPv4Net) + + // Enlarge the ClusterIP CIDR with the ClusterIP. Note that, if the ClusterIP CIDR doesn't exist, just generate a CIDR + // with the ClusterIP. + newClusterIPCIDR, err := util.ExtendCIDRWithIP(curClusterIPCIDR, svcIP) if err != nil { - return err + return fmt.Errorf("enlarge the destination CIDR with an error: %w", err) } - return nil -} + // Generate a route with the new ClusterIP CIDR and install it. + route := generateRoute(newClusterIPCIDR, gw, linkIndex, metric) + if err = util.ReplaceNetRoute(route); err != nil { + return fmt.Errorf("failed to install new ClusterIP route: %w", err) + } -// TODO: Follow the code style in Linux that maintains one Service CIDR. -func (c *Client) addServiceRoute(svcIP net.IP) error { - obj, found := c.hostRoutes.Load(svcIP.String()) - svcIPNet := util.NewIPNet(svcIP) + // Store the new ClusterIP CIDR. + c.clusterIPv4CIDR = newClusterIPCIDR + klog.V(4).InfoS("Created a route to route the ClusterIP to Antrea gateway", "route", route, "ClusterIP", svcIP) - // Route: Service IP -> VirtualServiceIPv4 (169.254.0.253) - route := &util.Route{ - LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, - DestinationSubnet: svcIPNet, - GatewayAddress: config.VirtualServiceIPv4, - RouteMetric: util.MetricHigh, - } - if found { - existingRoute := obj.(*util.Route) - if existingRoute.GatewayAddress.Equal(route.GatewayAddress) && existingRoute.RouteMetric == route.RouteMetric { - klog.V(2).InfoS("Service route already exists", "DestinationIP", route.DestinationSubnet, - "Gateway", route.GatewayAddress, "RouteMetric", route.RouteMetric) - return nil + // Collect stale routes. + routes, err := c.listIPRoutesOnGW() + if err != nil { + return fmt.Errorf("error listing ip routes: %w", err) + } + + var staleRoutes []*util.Route + if curClusterIPCIDR != nil { + // If current destination CIDR is not nil, the route with current destination CIDR should be uninstalled since + // a new route with a newly calculated destination CIDR has been installed. + route.DestinationSubnet = curClusterIPCIDR + staleRoutes = []*util.Route{route} + + // TODO: delete the code below in the future. The code below is used to delete stale route for the ClusterIP before + // this commit. For Antrea which is not upgraded from the version before this commit, the code below is unnecessary. + for _, rt := range routes { + ones, _ := rt.DestinationSubnet.Mask.Size() + if ones == ipv4AddrLength && rt.DestinationSubnet.IP.Equal(svcIP) { + staleRoutes = append(staleRoutes, rt) + } } - // Remove the existing route if gateway or metric is not as expected. - if err := util.RemoveNetRoute(existingRoute); err != nil { - return fmt.Errorf("failed to delete existing Service route entry, DestinationIP: %s, Gateway: %s, RouteMetric: %d, err: %v", - existingRoute.DestinationSubnet, existingRoute.GatewayAddress, existingRoute.RouteMetric, err) + + } else { + // If current ClusterIP CIDR doesn't exist, which means that Antrea Agent has just started, and the route for + // ClusterIP generated in the last round should be deleted. + for _, rt := range routes { + // Skip the route whose destination CIDR only has one address. + ones, _ := rt.DestinationSubnet.Mask.Size() + if ones == ipv4AddrLength { + continue + } + // Find stale route. + if rt.DestinationSubnet.Contains(svcIP) && rt.GatewayAddress.Equal(gw) { + staleRoutes = append(staleRoutes, rt) + break + } } } - if err := util.ReplaceNetRoute(route); err != nil { - return err + // Remove stale routes. + for _, rt := range staleRoutes { + if err = util.RemoveNetRoute(rt); err != nil { + if strings.Contains(err.Error(), "No matching MSFT_NetRoute objects") { + klog.InfoS("Failed to delete stale ClusterIP route since the route doesn't exist", "route", route) + } else { + return fmt.Errorf("failed to delete routing entry for ClusterIP %s: %w", svcIP.String(), err) + } + } + klog.V(4).InfoS("Uninstalled stale ClusterIP route successfully", "stale route", rt) } - c.hostRoutes.Store(route.DestinationSubnet.String(), route) - klog.V(2).InfoS("Added Service route", "ServiceIP", route.DestinationSubnet, "GatewayIP", route.GatewayAddress) return nil } -func (c *Client) deleteServiceRoute(svcIP net.IP) error { - svcIPNet := util.NewIPNet(svcIP) - obj, found := c.hostRoutes.Load(svcIPNet.String()) - if !found { - klog.V(2).InfoS("Service route does not exist", "DestinationIP", svcIP) - return nil - } +func (c *Client) addVirtualNodePortDNATIPRoute(isIPv6 bool) error { + linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + vIP := config.VirtualNodePortDNATIPv4 + gw := config.VirtualServiceIPv4 - rt := obj.(*util.Route) - if err := util.RemoveNetRoute(rt); err != nil { - return err + route := generateRoute(virtualNodePortDNATIPv4Net, gw, linkIndex, util.MetricHigh) + if err := util.ReplaceNetRoute(route); err != nil { + return fmt.Errorf("failed to install route for virtual Service IP %s: %w", vIP.String(), err) } - c.hostRoutes.Delete(svcIP.String()) - klog.V(2).InfoS("Deleted Service route from host gateway", "DestinationIP", svcIP) - return nil -} - -func (c *Client) AddClusterIPRoute(svcIP net.IP) error { - return c.addServiceRoute(svcIP) -} + c.serviceRoutes.Store(vIP.String(), route) + klog.InfoS("Added virtual Service IP route", "route", route) -func (c *Client) DeleteClusterIPRoute(svcIP net.IP) error { - return c.deleteServiceRoute(svcIP) + return nil } // MigrateRoutesToGw is not supported on Windows. @@ -330,9 +360,105 @@ func (c *Client) UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error // Run is not supported on Windows and returns immediately. func (c *Client) Run(stopCh <-chan struct{}) { + klog.Infof("Starting NetNat and route sync, with sync interval %v", syncInterval) + wait.Until(c.syncIPInfra, syncInterval, stopCh) +} + +// syncIPInfra is idempotent and can be safely called on every sync operation. +func (c *Client) syncIPInfra() { + if err := c.syncRoute(); err != nil { + klog.Errorf("Failed to sync route: %v", err) + } + + if c.proxyAll { + if err := c.syncNetNat(); err != nil { + klog.Errorf("Failed to sync netNat: %v", err) + } + } + + klog.V(3).Infof("Successfully synced Node netNat and route") } -func (c *Client) listRoutes() (map[string]*util.Route, error) { +func (c *Client) syncRoute() error { + routeMap, err := c.listIPRoutesOnGW() + if err != nil { + return err + } + + restoreRoute := func(route *util.Route) bool { + r, ok := routeMap[route.DestinationSubnet.String()] + if ok && route.Equal(*r) { + return true + } + if err := util.ReplaceNetRoute(route); err != nil { + klog.Errorf("Failed to add route to the gateway: %v", err) + return false + } + return true + } + c.nodeRoutes.Range(func(_, v interface{}) bool { + route := v.(*util.Route) + return restoreRoute(route) + }) + if c.proxyAll { + c.serviceRoutes.Range(func(_, v interface{}) bool { + route := v.(*util.Route) + return restoreRoute(route) + }) + } + // These routes are installed automatically by the kernel when the address is configured on + // the interface. If these routes are deleted manually by mistake, we restore them. + gwAutoconfRoutes := []*util.Route{ + { + LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, + DestinationSubnet: util.NewIPNet(c.nodeConfig.GatewayConfig.IPv4), + GatewayAddress: c.nodeConfig.GatewayConfig.IPv4, + RouteMetric: util.MetricDefault, + }, + { + LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, + DestinationSubnet: c.nodeConfig.PodIPv4CIDR, + GatewayAddress: c.nodeConfig.GatewayConfig.IPv4, + RouteMetric: util.MetricDefault, + }, + } + + for _, route := range gwAutoconfRoutes { + restoreRoute(route) + } + return nil +} + +func (c *Client) syncNetNat() error { + if err := util.NewNetNat(antreaNatNodePort, virtualNodePortDNATIPv4Net); err != nil { + return err + } + + restoreNetNatStaticMapping := func(m *util.NetNatStaticMapping) bool { + if err := util.ReplaceNetNatStaticMapping(m); err != nil { + klog.Errorf("Failed to add netNat static mapping: %v", err) + return false + } + return true + } + + c.netNat.Range(func(_, v interface{}) bool { + mapping := v.(*util.NetNatStaticMapping) + return restoreNetNatStaticMapping(mapping) + }) + + return nil +} + +func (c *Client) isServiceRoute(route *util.Route) bool { + // If the gateway IP is the virtual Service IP , then it is a route which is added by AntreaProxy. + if route.GatewayAddress.Equal(config.VirtualServiceIPv4) { + return true + } + return false +} + +func (c *Client) listIPRoutesOnGW() (map[string]*util.Route, error) { routes, err := util.GetNetRoutesAll() if err != nil { return nil, err @@ -387,16 +513,80 @@ func (c *Client) DeleteSNATRule(mark uint32) error { // TODO: nodePortAddresses is not supported currently. func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { - return util.ReplaceNetNatStaticMapping(antreaNatNodePort, "0.0.0.0", port, config.VirtualServiceIPv4.String(), port, string(protocol)) + netNatStaticMapping := &util.NetNatStaticMapping{ + Name: antreaNatNodePort, + ExternalIP: net.ParseIP("0.0.0.0"), + ExternalPort: port, + InternalIP: config.VirtualNodePortDNATIPv4, + InternalPort: port, + Protocol: protocol, + } + if err := util.ReplaceNetNatStaticMapping(netNatStaticMapping); err != nil { + return err + } + c.netNat.Store(fmt.Sprintf("%d-%s", port, protocol), netNatStaticMapping) + klog.V(4).InfoS("Added NodePort NetNatStaticMapping", "NetNatStaticMapping", netNatStaticMapping) + return nil } func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { - return util.RemoveNetNatStaticMapping(antreaNatNodePort, "0.0.0.0", port, string(protocol)) + obj, found := c.netNat.Load(fmt.Sprintf("%d-%s", port, protocol)) + if !found { + klog.V(2).Infof("NetNatStaticMapping with port %d, protocol %s not exists", port, protocol) + return nil + } + netNatStaticMapping := obj.(*util.NetNatStaticMapping) + if err := util.RemoveNetNatStaticMapping(netNatStaticMapping); err != nil { + return err + } + c.netNat.Delete(strconv.Itoa(int(port))) + klog.V(4).InfoS("Deleted NodePort NetNatStaticMapping", "NetNatStaticMapping", netNatStaticMapping) + return nil +} + +// addLoadBalancerIngressIPRoute is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea +// gateway on host. +func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error { + linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + gw := config.VirtualServiceIPv4 + metric := util.MetricHigh + _, svcIPNet, _ := net.ParseCIDR(svcIPStr) + + route := generateRoute(svcIPNet, gw, linkIndex, metric) + if err := util.ReplaceNetRoute(route); err != nil { + return fmt.Errorf("failed to install routing entry for LoadBalancer ingress IP %s: %w", svcIPStr, err) + } + klog.V(4).InfoS("Added LoadBalancer ingress IP route", "route", route) + c.serviceRoutes.Store(svcIPStr, route) + + return nil +} + +// deleteLoadBalancerIngressIPRoute is used to delete routing entry which is used to route LoadBalancer ingress IP to Antrea +// gateway on host. +func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error { + linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + gw := config.VirtualServiceIPv4 + metric := util.MetricHigh + _, svcIPNet, _ := net.ParseCIDR(svcIPStr) + + route := generateRoute(svcIPNet, gw, linkIndex, metric) + if err := util.RemoveNetRoute(route); err != nil { + if strings.Contains(err.Error(), "No matching MSFT_NetRoute objects") { + klog.InfoS("Failed to delete LoadBalancer ingress IP route since the route doesn't exist", "route", route) + } else { + return fmt.Errorf("failed to delete routing entry for LoadBalancer ingress IP %s: %w", svcIPStr, err) + } + } + klog.V(4).InfoS("Uninstalled stale ClusterIP route successfully", "stale route", route) + c.serviceRoutes.Delete(svcIPStr) + + return nil } func (c *Client) AddLoadBalancer(externalIPs []string) error { for _, svcIPStr := range externalIPs { - if err := c.addServiceRoute(net.ParseIP(svcIPStr)); err != nil { + if err := c.addLoadBalancerIngressIPRoute(svcIPStr); err != nil { return err } } @@ -405,7 +595,7 @@ func (c *Client) AddLoadBalancer(externalIPs []string) error { func (c *Client) DeleteLoadBalancer(externalIPs []string) error { for _, svcIPStr := range externalIPs { - if err := c.deleteServiceRoute(net.ParseIP(svcIPStr)); err != nil { + if err := c.deleteLoadBalancerIngressIPRoute(svcIPStr); err != nil { return err } } @@ -419,3 +609,21 @@ func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error func (c *Client) DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { return nil } + +func generateRoute(ipNet *net.IPNet, gw net.IP, linkIndex int, metric int) *util.Route { + return &util.Route{ + DestinationSubnet: ipNet, + GatewayAddress: gw, + RouteMetric: metric, + LinkIndex: linkIndex, + } +} + +func generateNeigh(ip net.IP, linkIndex int) *util.Neighbor { + return &util.Neighbor{ + LinkIndex: linkIndex, + IPAddress: ip, + LinkLayerAddress: openflow.GlobalVirtualMAC, + State: "Permanent", + } +} diff --git a/pkg/agent/route/route_windows_test.go b/pkg/agent/route/route_windows_test.go index bfec5a310f7..04acd6b0f6e 100644 --- a/pkg/agent/route/route_windows_test.go +++ b/pkg/agent/route/route_windows_test.go @@ -51,12 +51,10 @@ func TestRouteOperation(t *testing.T) { _, destCIDR2, _ := net.ParseCIDR(dest2) client, err := NewClient(&config.NetworkConfig{}, true, false, false, false) - svcStr1 := "1.1.0.10" - svcIP1 := net.ParseIP(svcStr1) - svcIPNet1 := util.NewIPNet(svcIP1) - svcStr2 := "1.1.0.11" - svcIP2 := net.ParseIP(svcStr2) - svcIPNet2 := util.NewIPNet(svcIP2) + svcIP1 := net.ParseIP("1.1.0.1") + _, expectedClusterIPNet1, _ := net.ParseCIDR("1.1.0.1/32") + svcIP2 := net.ParseIP("1.1.0.7") + _, expectedClusterIPNet2, _ := net.ParseCIDR("1.1.0.0/29") require.Nil(t, err) nodeConfig := &config.NodeConfig{ @@ -86,36 +84,29 @@ func TestRouteOperation(t *testing.T) { err = client.AddClusterIPRoute(svcIP1) require.Nil(t, err) - route3, err := util.GetNetRoutes(gwLink, svcIPNet1) + route3, err := util.GetNetRoutes(gwLink, expectedClusterIPNet1) require.Nil(t, err) assert.Equal(t, 1, len(route3)) err = client.AddClusterIPRoute(svcIP2) require.Nil(t, err) - route4, err := util.GetNetRoutes(gwLink, svcIPNet2) + route4, err := util.GetNetRoutes(gwLink, expectedClusterIPNet1) + require.Nil(t, err) + assert.Equal(t, 0, len(route4)) // Old ClusterIP route is expected to be deleted. + require.Nil(t, err) + route4, err = util.GetNetRoutes(gwLink, expectedClusterIPNet2) // New ClusterIP route is expected to be installed. require.Nil(t, err) assert.Equal(t, 1, len(route4)) - err = client.Reconcile([]string{dest2}, map[string]bool{svcIPNet1.String(): true}) + err = client.Reconcile([]string{dest2}) require.Nil(t, err) - routes5, err := util.GetNetRoutes(gwLink, destCIDR1) require.Nil(t, err) assert.Equal(t, 0, len(routes5)) - routes6, err := util.GetNetRoutes(gwLink, svcIPNet2) - require.Nil(t, err) - assert.Equal(t, 0, len(routes6)) - err = client.DeleteRoutes(destCIDR2) require.Nil(t, err) routes7, err := util.GetNetRoutes(gwLink, destCIDR2) require.Nil(t, err) assert.Equal(t, 0, len(routes7)) - - err = client.DeleteClusterIPRoute(svcIP1) - require.Nil(t, err) - routes8, err := util.GetNetRoutes(gwLink, svcIPNet1) - require.Nil(t, err) - assert.Equal(t, 0, len(routes8)) } diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 67ee9270b63..1698b457cd3 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -134,20 +134,6 @@ func (mr *MockInterfaceMockRecorder) AddSNATRule(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSNATRule", reflect.TypeOf((*MockInterface)(nil).AddSNATRule), arg0, arg1) } -// DeleteClusterIPRoute mocks base method -func (m *MockInterface) DeleteClusterIPRoute(arg0 net.IP) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteClusterIPRoute", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteClusterIPRoute indicates an expected call of DeleteClusterIPRoute -func (mr *MockInterfaceMockRecorder) DeleteClusterIPRoute(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteClusterIPRoute", reflect.TypeOf((*MockInterface)(nil).DeleteClusterIPRoute), arg0) -} - // DeleteLoadBalancer mocks base method func (m *MockInterface) DeleteLoadBalancer(arg0 []string) error { m.ctrl.T.Helper() @@ -247,17 +233,17 @@ func (mr *MockInterfaceMockRecorder) MigrateRoutesToGw(arg0 interface{}) *gomock } // Reconcile mocks base method -func (m *MockInterface) Reconcile(arg0 []string, arg1 map[string]bool) error { +func (m *MockInterface) Reconcile(arg0 []string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Reconcile", arg0, arg1) + ret := m.ctrl.Call(m, "Reconcile", arg0) ret0, _ := ret[0].(error) return ret0 } // Reconcile indicates an expected call of Reconcile -func (mr *MockInterfaceMockRecorder) Reconcile(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockInterfaceMockRecorder) Reconcile(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconcile", reflect.TypeOf((*MockInterface)(nil).Reconcile), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconcile", reflect.TypeOf((*MockInterface)(nil).Reconcile), arg0) } // Run mocks base method diff --git a/pkg/agent/util/net.go b/pkg/agent/util/net.go index 3b3304c9ded..447832508ab 100644 --- a/pkg/agent/util/net.go +++ b/pkg/agent/util/net.go @@ -269,6 +269,9 @@ func GetIPWithFamily(ips []net.IP, addrFamily uint8) (net.IP, error) { // ExtendCIDRWithIP is used for extending an IPNet with an IP. func ExtendCIDRWithIP(ipNet *net.IPNet, ip net.IP) (*net.IPNet, error) { + if ipNet == nil { + return NewIPNet(ip), nil + } cpl := commonPrefixLen(ipNet.IP, ip) if cpl == 0 { return nil, fmt.Errorf("invalid common prefix length") diff --git a/pkg/agent/util/net_windows.go b/pkg/agent/util/net_windows.go index ac5abc8c7a5..7a0cf9ed47d 100644 --- a/pkg/agent/util/net_windows.go +++ b/pkg/agent/util/net_windows.go @@ -34,6 +34,7 @@ import ( "k8s.io/klog/v2" ps "antrea.io/antrea/pkg/agent/util/powershell" + binding "antrea.io/antrea/pkg/ovs/openflow" ) const ( @@ -63,6 +64,12 @@ func (r Route) String() string { r.LinkIndex, r.DestinationSubnet, r.GatewayAddress, r.RouteMetric) } +func (r Route) Equal(x Route) bool { + return x.LinkIndex == r.LinkIndex && + x.DestinationSubnet.IP.Equal(r.DestinationSubnet.IP) && + x.GatewayAddress.Equal(r.GatewayAddress) +} + type Neighbor struct { LinkIndex int IPAddress net.IP @@ -74,6 +81,19 @@ func (n Neighbor) String() string { return fmt.Sprintf("LinkIndex: %d, IPAddress: %s, LinkLayerAddress: %s", n.LinkIndex, n.IPAddress, n.LinkLayerAddress) } +type NetNatStaticMapping struct { + Name string + ExternalIP net.IP + ExternalPort uint16 + InternalIP net.IP + InternalPort uint16 + Protocol binding.Protocol +} + +func (n NetNatStaticMapping) String() string { + return fmt.Sprintf("Name: %s, ExternalIP %s, ExternalPort: %d, InternalIP: %s, InternalPort: %d, Protocol: %s", n.Name, n.ExternalIP, n.ExternalPort, n.InternalIP, n.InternalPort, n.Protocol) +} + func GetNSPath(containerNetNS string) (string, error) { return containerNetNS, nil } @@ -721,15 +741,15 @@ func NewNetNat(netNatName string, subnetCIDR *net.IPNet) error { return nil } -func ReplaceNetNatStaticMapping(netNatName string, externalIPAddr string, externalPort uint16, internalIPAddr string, internalPort uint16, proto string) error { - staticMappingStr, err := GetNetNatStaticMapping(netNatName, externalIPAddr, externalPort, proto) +func ReplaceNetNatStaticMapping(mapping *NetNatStaticMapping) error { + staticMappingStr, err := GetNetNatStaticMapping(mapping) if err != nil { return err } parsed := parseGetNetCmdResult(staticMappingStr, 6) if len(parsed) > 0 { items := parsed[0] - if items[4] == internalIPAddr && items[5] == strconv.Itoa(int(internalPort)) { + if items[4] == mapping.InternalIP.String() && items[5] == strconv.Itoa(int(mapping.ExternalPort)) { return nil } firstCol := strings.Split(items[0], ";") @@ -737,19 +757,19 @@ func ReplaceNetNatStaticMapping(netNatName string, externalIPAddr string, extern if err != nil { return err } - if err := RemoveNetNatStaticMappingByID(netNatName, id); err != nil { + if err := RemoveNetNatStaticMappingByID(mapping.Name, id); err != nil { return err } } - return AddNetNatStaticMapping(netNatName, externalIPAddr, externalPort, internalIPAddr, internalPort, proto) + return AddNetNatStaticMapping(mapping) } // GetNetNatStaticMapping checks if a NetNatStaticMapping exists. -func GetNetNatStaticMapping(netNatName string, externalIPAddr string, externalPort uint16, proto string) (string, error) { - cmd := fmt.Sprintf("Get-NetNatStaticMapping -NatName %s", netNatName) + - fmt.Sprintf("|? ExternalIPAddress -EQ %s", externalIPAddr) + - fmt.Sprintf("|? ExternalPort -EQ %d", externalPort) + - fmt.Sprintf("|? Protocol -EQ %s", proto) + +func GetNetNatStaticMapping(mapping *NetNatStaticMapping) (string, error) { + cmd := fmt.Sprintf("Get-NetNatStaticMapping -NatName %s", mapping.Name) + + fmt.Sprintf("|? ExternalIPAddress -EQ %s", mapping.ExternalIP) + + fmt.Sprintf("|? ExternalPort -EQ %d", mapping.ExternalPort) + + fmt.Sprintf("|? Protocol -EQ %s", mapping.Protocol) + "| Format-Table -HideTableHeaders" staticMappingStr, err := ps.RunCommand(cmd) if err != nil && !strings.Contains(err.Error(), "No MSFT_NetNatStaticMapping objects found") { @@ -759,15 +779,15 @@ func GetNetNatStaticMapping(netNatName string, externalIPAddr string, externalPo } // AddNetNatStaticMapping adds a static mapping to a NAT instance. -func AddNetNatStaticMapping(netNatName string, externalIPAddr string, externalPort uint16, internalIPAddr string, internalPort uint16, proto string) error { +func AddNetNatStaticMapping(mapping *NetNatStaticMapping) error { cmd := fmt.Sprintf("Add-NetNatStaticMapping -NatName %s -ExternalIPAddress %s -ExternalPort %d -InternalIPAddress %s -InternalPort %d -Protocol %s", - netNatName, externalIPAddr, externalPort, internalIPAddr, internalPort, proto) + mapping.Name, mapping.ExternalIP, mapping.ExternalPort, mapping.InternalIP, mapping.InternalPort, mapping.Protocol) _, err := ps.RunCommand(cmd) return err } -func RemoveNetNatStaticMapping(netNatName string, externalIPAddr string, externalPort uint16, proto string) error { - staticMappingStr, err := GetNetNatStaticMapping(netNatName, externalIPAddr, externalPort, proto) +func RemoveNetNatStaticMapping(mapping *NetNatStaticMapping) error { + staticMappingStr, err := GetNetNatStaticMapping(mapping) if err != nil { return err } @@ -781,24 +801,24 @@ func RemoveNetNatStaticMapping(netNatName string, externalIPAddr string, externa if err != nil { return err } - return RemoveNetNatStaticMappingByID(netNatName, id) + return RemoveNetNatStaticMappingByID(mapping.Name, id) } -func RemoveNetNatStaticMappingByNPLTuples(netNatName string, externalIPAddr string, externalPort uint16, internalIPAddr string, internalPort uint16, proto string) error { - staticMappingStr, err := GetNetNatStaticMapping(netNatName, externalIPAddr, externalPort, proto) +func RemoveNetNatStaticMappingByNPLTuples(mapping *NetNatStaticMapping) error { + staticMappingStr, err := GetNetNatStaticMapping(mapping) if err != nil { return err } parsed := parseGetNetCmdResult(staticMappingStr, 6) if len(parsed) > 0 { items := parsed[0] - if items[4] == internalIPAddr && items[5] == strconv.Itoa(int(internalPort)) { + if items[4] == mapping.InternalIP.String() && items[5] == strconv.Itoa(int(mapping.InternalPort)) { firstCol := strings.Split(items[0], ";") id, err := strconv.Atoi(firstCol[1]) if err != nil { return err } - if err := RemoveNetNatStaticMappingByID(netNatName, id); err != nil { + if err := RemoveNetNatStaticMappingByID(mapping.Name, id); err != nil { return err } return nil diff --git a/test/integration/agent/route_test.go b/test/integration/agent/route_test.go index d74f23d60aa..d849a551251 100644 --- a/test/integration/agent/route_test.go +++ b/test/integration/agent/route_test.go @@ -283,9 +283,9 @@ func TestIpTablesSync(t *testing.T) { assert.Equal(t, "", string(actualData), "failed to remove iptables rule for %v", tc) } stopCh := make(chan struct{}) - route.IPTablesSyncInterval = 2 * time.Second + route.SyncInterval = 2 * time.Second go routeClient.Run(stopCh) - time.Sleep(route.IPTablesSyncInterval) // wait for one iteration of sync operation. + time.Sleep(route.SyncInterval) // wait for one iteration of sync operation. for _, tc := range tcs { saveCmd := fmt.Sprintf("iptables-save -t %s | grep -e '%s %s'", tc.Table, tc.Cmd, tc.Chain) // #nosec G204: ignore in test code @@ -440,9 +440,9 @@ func TestSyncRoutes(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - route.IPTablesSyncInterval = 2 * time.Second + route.SyncInterval = 2 * time.Second go routeClient.Run(stopCh) - time.Sleep(route.IPTablesSyncInterval) // wait for one iteration of sync operation. + time.Sleep(route.SyncInterval) // wait for one iteration of sync operation. output, err := exec.Command("bash", "-c", listCmd).Output() assert.NoError(t, err, "error executing ip route command: %s", listCmd) @@ -484,10 +484,10 @@ func TestSyncGatewayKernelRoute(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - route.IPTablesSyncInterval = 2 * time.Second + route.SyncInterval = 2 * time.Second go routeClient.Run(stopCh) - err = wait.Poll(1*time.Second, 2*route.IPTablesSyncInterval, func() (done bool, err error) { + err = wait.Poll(1*time.Second, 2*route.SyncInterval, func() (done bool, err error) { expOutput, err := exec.Command("bash", "-c", listCmd).Output() if err != nil { return false, err @@ -514,7 +514,6 @@ func TestReconcile(t *testing.T) { addedRoutes []peer desiredPeerCIDRs []string desiredNodeIPs []string - desiredServices map[string]bool // expectations expRoutes map[string]netlink.Link }{ @@ -527,7 +526,6 @@ func TestReconcile(t *testing.T) { }, desiredPeerCIDRs: []string{"10.10.20.0/24"}, desiredNodeIPs: []string{remotePeerIP.String()}, - desiredServices: map[string]bool{"200.200.10.10": true}, expRoutes: map[string]netlink.Link{"10.10.20.0/24": gwLink, "10.10.30.0/24": nil}, }, { @@ -539,7 +537,6 @@ func TestReconcile(t *testing.T) { }, desiredPeerCIDRs: []string{"10.10.20.0/24"}, desiredNodeIPs: []string{localPeerIP.String()}, - desiredServices: map[string]bool{"200.200.10.10": true}, expRoutes: map[string]netlink.Link{"10.10.20.0/24": nodeLink, "10.10.30.0/24": nil}, }, { @@ -553,7 +550,6 @@ func TestReconcile(t *testing.T) { }, desiredPeerCIDRs: []string{"10.10.20.0/24", "10.10.40.0/24"}, desiredNodeIPs: []string{localPeerIP.String(), remotePeerIP.String()}, - desiredServices: map[string]bool{"200.200.10.10": true}, expRoutes: map[string]netlink.Link{"10.10.20.0/24": nodeLink, "10.10.30.0/24": nil, "10.10.40.0/24": gwLink, "10.10.50.0/24": nil}, }, } @@ -571,7 +567,7 @@ func TestReconcile(t *testing.T) { assert.NoError(t, routeClient.AddRoutes(peerNet, tc.nodeName, route.peerIP, peerGwIP), "adding routes failed") } - assert.NoError(t, routeClient.Reconcile(tc.desiredPeerCIDRs, tc.desiredServices), "reconcile failed") + assert.NoError(t, routeClient.Reconcile(tc.desiredPeerCIDRs), "reconcile failed") for dst, uplink := range tc.expRoutes { expNum := 0