diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 0e585b65c23..cfd438f1643 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -257,8 +257,6 @@ func run(o *Options) error { stopCh, o.nodeType, o.config.ExternalNode.ExternalNodeNamespace, - features.DefaultFeatureGate.Enabled(features.AntreaProxy), - o.config.AntreaProxy.ProxyAll, connectUplinkToBridge, l7NetworkPolicyEnabled) err = agentInitializer.Initialize() diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 0dfafb53933..da32d2f3be1 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -110,11 +110,9 @@ type Initializer struct { serviceConfig *config.ServiceConfig l7NetworkPolicyConfig *config.L7NetworkPolicyConfig enableL7NetworkPolicy bool - enableProxy bool connectUplinkToBridge bool // networkReadyCh should be closed once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. - proxyAll bool networkReadyCh chan<- struct{} stopCh <-chan struct{} nodeType config.NodeType @@ -140,8 +138,6 @@ func NewInitializer( stopCh <-chan struct{}, nodeType config.NodeType, externalNodeNamespace string, - enableProxy bool, - proxyAll bool, connectUplinkToBridge bool, enableL7NetworkPolicy bool, ) *Initializer { @@ -165,8 +161,6 @@ func NewInitializer( stopCh: stopCh, nodeType: nodeType, externalNodeNamespace: externalNodeNamespace, - enableProxy: enableProxy, - proxyAll: proxyAll, connectUplinkToBridge: connectUplinkToBridge, enableL7NetworkPolicy: enableL7NetworkPolicy, } @@ -177,7 +171,7 @@ func (i *Initializer) GetNodeConfig() *config.NodeConfig { return i.nodeConfig } -// GetNodeConfig returns the NodeConfig. +// GetWireGuardClient returns the Wireguard client. func (i *Initializer) GetWireGuardClient() wireguard.Interface { return i.wireGuardClient } diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 0bfcd801bcd..76a87960e16 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -204,8 +204,6 @@ func (c *Controller) removeStaleGatewayRoutes() error { } // routeClient will remove orphaned routes whose destinations are not in desiredPodCIDRs. - // If proxyAll enabled, it will also remove routes that are for Windows ClusterIP Services - // which no longer exist. if err := c.routeClient.Reconcile(desiredPodCIDRs); err != nil { return err } diff --git a/pkg/agent/nodeportlocal/rules/netnat_rule.go b/pkg/agent/nodeportlocal/rules/netnat_rule.go index e5c36effffc..6f7ee202f7b 100644 --- a/pkg/agent/nodeportlocal/rules/netnat_rule.go +++ b/pkg/agent/nodeportlocal/rules/netnat_rule.go @@ -19,11 +19,13 @@ package rules import ( "fmt" + "net" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/agent/util" + binding "antrea.io/antrea/pkg/ovs/openflow" ) // Use antrea-nat netnatstaticmapping rules as NPL implementation @@ -68,13 +70,18 @@ func (nn *netnatRules) initRules() error { // AddRule appends a NetNatStaticMapping rule. func (nn *netnatRules) AddRule(nodePort int, podIP string, podPort int, protocol string) error { - nodePort16 := util.PortToUint16(nodePort) - podPort16 := util.PortToUint16(podPort) - podAddr := fmt.Sprintf("%s:%d", podIP, podPort16) - if err := util.ReplaceNetNatStaticMapping(antreaNatNPL, "0.0.0.0", nodePort16, podIP, podPort16, protocol); err != nil { + netNatStaticMapping := &util.NetNatStaticMapping{ + Name: antreaNatNPL, + ExternalIP: net.ParseIP("0.0.0.0"), + ExternalPort: util.PortToUint16(nodePort), + InternalIP: net.ParseIP(podIP), + InternalPort: util.PortToUint16(podPort), + Protocol: binding.Protocol(protocol), + } + if err := util.ReplaceNetNatStaticMapping(netNatStaticMapping); err != nil { return err } - klog.InfoS("Successfully added NetNat rule", "podAddr", podAddr, "nodePort", nodePort16, "protocol", protocol) + klog.InfoS("Successfully added NetNatStaticMapping", "NetNatStaticMapping", netNatStaticMapping) return nil } @@ -90,13 +97,18 @@ func (nn *netnatRules) AddAllRules(nplList []PodNodePort) error { // DeleteRule deletes a specific NPL rule from NetNatStaticMapping table func (nn *netnatRules) DeleteRule(nodePort int, podIP string, podPort int, protocol string) error { - nodePort16 := util.PortToUint16(nodePort) - podPort16 := util.PortToUint16(podPort) - podAddr := fmt.Sprintf("%s:%d", podIP, podPort16) - if err := util.RemoveNetNatStaticMappingByNPLTuples(antreaNatNPL, "0.0.0.0", nodePort16, podIP, podPort16, protocol); err != nil { + netNatStaticMapping := &util.NetNatStaticMapping{ + Name: antreaNatNPL, + ExternalIP: net.ParseIP("0.0.0.0"), + ExternalPort: util.PortToUint16(nodePort), + InternalIP: net.ParseIP(podIP), + InternalPort: util.PortToUint16(podPort), + Protocol: binding.Protocol(protocol), + } + if err := util.RemoveNetNatStaticMappingByNPLTuples(netNatStaticMapping); err != nil { return err } - klog.InfoS("Successfully deleted NetNatStaticMapping rule", "podAddr", podAddr, "nodePort", nodePort16, "protocol", protocol) + klog.InfoS("Successfully deleted NetNatStaticMapping", "NetNatStaticMapping", netNatStaticMapping) return nil } diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 4bb3efcbaf4..c72ce1d4afe 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -177,12 +177,6 @@ func (p *proxier) removeStaleServices() { continue } } - if svcInfo.ClusterIP() != nil { - if err := p.deleteRouteForServiceIP(svcInfoStr, svcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil { - klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName) - continue - } - } } // Remove LoadBalancer flows and configurations. if p.proxyLoadBalancerIPs && len(svcInfo.LoadBalancerIPStrings()) > 0 { @@ -599,13 +593,6 @@ func (p *proxier) installServices() { continue } } - // If previous Service which has ClusterIP should be removed, remove ClusterIP routes. - if pSvcInfo.ClusterIP() != nil { - if err := p.deleteRouteForServiceIP(pSvcInfo.String(), pSvcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil { - klog.ErrorS(err, "Error when uninstalling ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) - continue - } - } } } @@ -628,15 +615,6 @@ func (p *proxier) installServices() { } if p.proxyAll { - // Install ClusterIP route on Node so that ClusterIP can be accessed on Node. Every time a new ClusterIP - // is created, the routing target IP block will be recalculated for expansion to be able to route the new - // created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR - // can be finally calculated after creating enough ClusterIPs. - if err := p.addRouteForServiceIP(svcInfo.String(), svcInfo.ClusterIP(), p.routeClient.AddClusterIPRoute); err != nil { - klog.ErrorS(err, "Error when installing ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) - continue - } - // If previous Service is nil or NodePort flows and configurations of previous Service have been removed, // install NodePort flows and configurations for current Service. if svcInfo.NodePort() > 0 && (pSvcInfo == nil || needRemoval) { diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 16cb81cb541..972ec347503 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -416,7 +416,6 @@ func testClusterIPAdd(t *testing.T, mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, true).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -531,7 +530,6 @@ func testLoadBalancerAdd(t *testing.T, groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal) mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) } - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if proxyLoadBalancerIPs { mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } @@ -640,7 +638,6 @@ func testNodePortAdd(t *testing.T, groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal) mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) } - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) fp.syncProxyRules() @@ -879,8 +876,6 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP).Times(1) - // The route for the ClusterIP and the LoadBalancer IP should only be installed once. - mockRouteClient.EXPECT().AddClusterIPRoute(svc1IPv4).Times(1) mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIPv4).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})).Times(1) @@ -911,7 +906,6 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) // The route for the ClusterIP and the LoadBalancer IP should only be uninstalled once. - mockRouteClient.EXPECT().DeleteClusterIPRoute(svc1IPv4) mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIPv4) fp.syncProxyRules() @@ -1097,13 +1091,11 @@ func testClusterIPRemove(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool, n mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) } mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, true).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) if !nodeLocalInternal { mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) } mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) - mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1169,14 +1161,12 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) - mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) fp.syncProxyRules() @@ -1247,7 +1237,6 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net. mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) @@ -1256,7 +1245,6 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net. mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) - mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) fp.syncProxyRules() @@ -1422,17 +1410,14 @@ func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net. mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) fp.syncProxyRules() mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) - mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() @@ -1488,7 +1473,6 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort), gomock.Any(), uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) fp.syncProxyRules() @@ -1496,13 +1480,11 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), gomock.Any()).Times(1) - mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) @@ -1778,13 +1760,10 @@ func testServiceClusterIPUpdate(t *testing.T, mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) s1 := mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) s2 := mockOFClient.EXPECT().InstallServiceFlows(groupID, updatedSvcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) s2.After(s1) - mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(updatedSvcIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), false, corev1.ServiceTypeNodePort, false).Times(1) @@ -1883,15 +1862,11 @@ func testServicePortUpdate(t *testing.T, mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) s1 := mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) s2 := mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort+1), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) s2.After(s1) - mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) - if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), false, corev1.ServiceTypeNodePort, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) @@ -1987,12 +1962,9 @@ func testServiceNodePortUpdate(t *testing.T, mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), false, corev1.ServiceTypeNodePort, false).Times(1) @@ -2090,7 +2062,6 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), false, corev1.ServiceTypeNodePort, false).Times(1) @@ -2109,8 +2080,6 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedAllEps)).Times(1) @@ -2196,7 +2165,6 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) assert.Contains(t, fp.endpointsInstalledMap, svcPortName) @@ -2286,7 +2254,6 @@ func testServiceIngressIPsUpdate(t *testing.T, for _, ip := range loadBalancerIPs { mockOFClient.EXPECT().InstallServiceFlows(groupID, ip, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) } - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) for _, ip := range loadBalancerIPs { mockRouteClient.EXPECT().AddLoadBalancer(ip).Times(1) @@ -2377,7 +2344,6 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, true, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(updatedAffinitySeconds), false, corev1.ServiceTypeClusterIP, false).Times(1) @@ -2469,14 +2435,11 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, true, expectedEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), false, corev1.ServiceTypeNodePort, false).Times(1) @@ -2706,7 +2669,6 @@ func TestGetServiceFlowKeys(t *testing.T) { makeEndpointSliceMap(fp, eps) } if tc.svc != nil && tc.eps != nil && tc.serviceInstalled { - mockRouteClient.EXPECT().AddClusterIPRoute(svc1IPv4).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddressesIPv4, uint16(svcNodePort), binding.ProtocolTCP).Times(1) mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(2) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1) diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index c55e9f7ae5a..8975c98f6bd 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -16,18 +16,25 @@ package route import ( "net" + "time" "antrea.io/antrea/pkg/agent/config" binding "antrea.io/antrea/pkg/ovs/openflow" ) +var ( + // SyncInterval is exported so that sync interval can be configured for running integration test with + // smaller values. It is meant to be used internally by Run. + SyncInterval = 60 * time.Second +) + // Interface is the interface for routing container packets in host network. type Interface interface { // Initialize should initialize all infrastructures required to route container packets in host network. // It should be idempotent and can be safely called on every startup. Initialize(nodeConfig *config.NodeConfig, done func()) error - // Reconcile should remove orphaned routes and related configuration based on the desired podCIDRs and Service IPs. + // Reconcile should remove orphaned routes and related configuration based on the desired podCIDRs. // If IPv6 is enabled in the cluster, Reconcile should also remove the orphaned IPv6 neighbors. Reconcile(podCIDRs []string) error @@ -58,13 +65,6 @@ type Interface interface { // DeleteNodePort deletes related configurations when a NodePort Service is deleted. DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error - // AddClusterIPRoute adds route on K8s node for Service ClusterIP. - AddClusterIPRoute(svcIP net.IP) error - - // DeleteClusterIPRoute deletes route for a Service IP when AntreaProxy is configured to handle - // ClusterIP Service traffic from host network. - DeleteClusterIPRoute(svcIP net.IP) error - // AddLoadBalancer adds configurations when a LoadBalancer IP is added. AddLoadBalancer(externalIP net.IP) error diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 62853fce9de..51497536112 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -73,6 +73,9 @@ const ( antreaPostRoutingChain = "ANTREA-POSTROUTING" antreaOutputChain = "ANTREA-OUTPUT" antreaMangleChain = "ANTREA-MANGLE" + + serviceIPv4CIDRKey = "serviceIPv4CIDRKey" + serviceIPv6CIDRKey = "serviceIPv6CIDRKey" ) // Client implements Interface. @@ -82,9 +85,6 @@ var ( // globalVMAC is used in the IPv6 neighbor configuration to advertise ND solicitation for the IPv6 address of the // host gateway interface on other Nodes. globalVMAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff") - // IPTablesSyncInterval is exported so that sync interval can be configured for running integration test with - // smaller values. It is meant to be used internally by Run. - IPTablesSyncInterval = 60 * time.Second ) // Client takes care of routing container packets in host network, coordinating ip route, ip rule, iptables and ipset. @@ -92,7 +92,7 @@ type Client struct { nodeConfig *config.NodeConfig networkConfig *config.NetworkConfig noSNAT bool - ipt iptables.Interface + iptables iptables.Interface ipset ipset.Interface netlink utilnetlink.Interface // nodeRoutes caches ip routes to remote Pods. It's a map of podCIDR to routes. @@ -115,10 +115,6 @@ type Client struct { nodePortsIPv4 sync.Map // nodePortsIPv6 caches all existing IPv6 NodePorts. nodePortsIPv6 sync.Map - // serviceIPv4CIDR stores the latest Service IPv4 CIDR from serviceCIDRProvider. - serviceIPv4CIDR *net.IPNet - // serviceIPv6CIDR stores the latest Service IPv6 CIDR from serviceCIDRProvider. - serviceIPv6CIDR *net.IPNet // clusterNodeIPs stores the IPv4 of all other Nodes in the cluster clusterNodeIPs sync.Map // clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster @@ -154,7 +150,7 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { return fmt.Errorf("failed to initialize ipset: %v", err) } - c.ipt, err = iptables.New(c.networkConfig.IPv4Enabled, c.networkConfig.IPv6Enabled) + c.iptables, err = iptables.New(c.networkConfig.IPv4Enabled, c.networkConfig.IPv6Enabled) if err != nil { return fmt.Errorf("error creating IPTables instance: %v", err) } @@ -207,28 +203,28 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { // It will not return until stopCh is closed. func (c *Client) Run(stopCh <-chan struct{}) { <-c.iptablesInitialized - klog.Infof("Starting iptables sync, with sync interval %v", IPTablesSyncInterval) - wait.Until(c.syncIPInfra, IPTablesSyncInterval, stopCh) + klog.InfoS("Starting iptables, ipset and route sync", "interval", SyncInterval) + wait.Until(c.syncIPInfra, SyncInterval, stopCh) } // syncIPInfra is idempotent and can be safely called on every sync operation. func (c *Client) syncIPInfra() { // Sync ipset before syncing iptables rules if err := c.syncIPSet(); err != nil { - klog.Errorf("Failed to sync ipset: %v", err) + klog.ErrorS(err, "Failed to sync ipset") return } if err := c.syncIPTables(); err != nil { - klog.Errorf("Failed to sync iptables: %v", err) + klog.ErrorS(err, "Failed to sync iptables") return } - if err := c.syncRoutes(); err != nil { - klog.Errorf("Failed to sync routes: %v", err) + if err := c.syncRoute(); err != nil { + klog.ErrorS(err, "Failed to sync route") } - klog.V(3).Infof("Successfully synced node iptables and routes") + klog.V(3).Info("Successfully synced iptables, ipset and route") } -func (c *Client) syncRoutes() error { +func (c *Client) syncRoute() error { routeList, err := c.netlink.RouteList(nil, netlink.FAMILY_ALL) if err != nil { return err @@ -247,7 +243,7 @@ func (c *Client) syncRoutes() error { return true } if err := c.netlink.RouteReplace(route); err != nil { - klog.Errorf("Failed to add route to the gateway: %v", err) + klog.ErrorS(err, "Failed to sync route", "Route", route) return false } return true @@ -496,11 +492,11 @@ func (c *Client) syncIPTables() error { jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}) } for _, rule := range jumpRules { - if err := c.ipt.EnsureChain(iptables.ProtocolDual, rule.table, rule.dstChain); err != nil { + if err := c.iptables.EnsureChain(iptables.ProtocolDual, rule.table, rule.dstChain); err != nil { return err } ruleSpec := []string{"-j", rule.dstChain, "-m", "comment", "--comment", rule.comment} - if err := c.ipt.AppendRule(iptables.ProtocolDual, rule.table, rule.srcChain, ruleSpec); err != nil { + if err := c.iptables.AppendRule(iptables.ProtocolDual, rule.table, rule.srcChain, ruleSpec); err != nil { return err } } @@ -531,7 +527,7 @@ func (c *Client) syncIPTables() error { false) // Setting --noflush to keep the previous contents (i.e. non antrea managed chains) of the tables. - if err := c.ipt.Restore(iptablesData.String(), false, false); err != nil { + if err := c.iptables.Restore(iptablesData.String(), false, false); err != nil { return err } } @@ -548,7 +544,7 @@ func (c *Client) syncIPTables() error { snatMarkToIPv6, true) // Setting --noflush to keep the previous contents (i.e. non antrea managed chains) of the tables. - if err := c.ipt.Restore(iptablesData.String(), false, true); err != nil { + if err := c.iptables.Restore(iptablesData.String(), false, true); err != nil { return err } } @@ -851,7 +847,7 @@ func (c *Client) initServiceIPRoutes() error { } // Reconcile removes orphaned podCIDRs from ipset and removes routes to orphaned podCIDRs -// based on the desired podCIDRs. svcIPs are used for Windows only. +// based on the desired podCIDRs. func (c *Client) Reconcile(podCIDRs []string) error { desiredPodCIDRs := sets.NewString(podCIDRs...) // Get the peer IPv6 gateways from pod CIDRs @@ -937,9 +933,9 @@ func (c *Client) Reconcile(podCIDRs []string) error { } func (c *Client) isServiceRoute(route *netlink.Route) bool { - // If the destination IP or gateway IP is virtual Service IP , then it is a route which is added by AntreaProxy. - if route.Dst.IP.Equal(config.VirtualServiceIPv6) || route.Dst.IP.Equal(config.VirtualServiceIPv4) || - route.Gw.Equal(config.VirtualServiceIPv6) || route.Gw.Equal(config.VirtualServiceIPv4) { + // If the gateway IP or the destination IP is the virtual Service IP, then it is a route added by AntreaProxy. + if route.Dst != nil && (route.Dst.IP.Equal(config.VirtualServiceIPv6) || route.Dst.IP.Equal(config.VirtualServiceIPv4)) || + route.Gw != nil && (route.Gw.Equal(config.VirtualServiceIPv6) || route.Gw.Equal(config.VirtualServiceIPv4)) { return true } return false @@ -1133,10 +1129,6 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error { return nil } -func (c *Client) DeleteClusterIPRoute(svcIP net.IP) error { - return nil -} - // Join all words with spaces, terminate with newline and write to buf. func writeLine(buf *bytes.Buffer, words ...string) { // We avoid strings.Join for performance reasons. @@ -1245,7 +1237,7 @@ func (c *Client) AddSNATRule(snatIP net.IP, mark uint32) error { protocol = iptables.ProtocolIPv6 } c.markToSNATIP.Store(mark, snatIP) - return c.ipt.InsertRule(protocol, iptables.NATTable, antreaPostRoutingChain, c.snatRuleSpec(snatIP, mark)) + return c.iptables.InsertRule(protocol, iptables.NATTable, antreaPostRoutingChain, c.snatRuleSpec(snatIP, mark)) } func (c *Client) DeleteSNATRule(mark uint32) error { @@ -1260,11 +1252,11 @@ func (c *Client) DeleteSNATRule(mark uint32) error { if snatIP.To4() == nil { protocol = iptables.ProtocolIPv6 } - return c.ipt.DeleteRule(protocol, iptables.NATTable, antreaPostRoutingChain, c.snatRuleSpec(snatIP, mark)) + return c.iptables.DeleteRule(protocol, iptables.NATTable, antreaPostRoutingChain, c.snatRuleSpec(snatIP, mark)) } -// addVirtualServiceIPRoute is used to add routing entry which is used to forward the packets whose destination IP is -// virtual Service IP back to Antrea gateway on host. +// addVirtualServiceIPRoute is used to add a route which is used to route the packets whose destination IP is a virtual +// IP to Antrea gateway. func (c *Client) addVirtualServiceIPRoute(isIPv6 bool) error { linkIndex := c.nodeConfig.GatewayConfig.LinkIndex svcIP := config.VirtualServiceIPv4 @@ -1307,6 +1299,7 @@ func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol b } else { c.nodePortsIPv4.Store(ipSetEntry, struct{}{}) } + klog.V(4).InfoS("Added ipset for NodePort", "IP", nodePortAddresses[i], "Port", port, "Protocol", protocol) } return nil @@ -1333,41 +1326,33 @@ func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protoco return nil } -// TODO: remove it after unifying Windows and Linux functions. -func (c *Client) AddClusterIPRoute(svcIP net.IP) error { - return nil -} - func (c *Client) addServiceCIDRRoute(serviceCIDR *net.IPNet) error { isIPv6 := utilnet.IsIPv6(serviceCIDR.IP) linkIndex := c.nodeConfig.GatewayConfig.LinkIndex scope := netlink.SCOPE_UNIVERSE - curServiceCIDR := c.serviceIPv4CIDR + serviceCIDRKey := serviceIPv4CIDRKey gw := config.VirtualServiceIPv4 if isIPv6 { - curServiceCIDR = c.serviceIPv6CIDR + serviceCIDRKey = serviceIPv6CIDRKey gw = config.VirtualServiceIPv6 } + oldServiceCIDRRoute, serviceCIDRRouteExists := c.serviceRoutes.Load(serviceCIDRKey) // Generate a route with the new Service CIDR and install it. serviceCIDRMask, _ := serviceCIDR.Mask.Size() route := generateRoute(serviceCIDR.IP, serviceCIDRMask, gw, linkIndex, scope) if err := c.netlink.RouteReplace(route); err != nil { return fmt.Errorf("failed to install a new Service CIDR route: %w", err) } + // Store the new Service CIDR. - if isIPv6 { - c.serviceIPv6CIDR = serviceCIDR - } else { - c.serviceIPv4CIDR = serviceCIDR - } + c.serviceRoutes.Store(serviceCIDRKey, route) // Collect stale routes. var staleRoutes []*netlink.Route - if curServiceCIDR != nil { + if serviceCIDRRouteExists { // If current destination CIDR is not nil, the route with current destination CIDR should be uninstalled. - route.Dst = curServiceCIDR - staleRoutes = []*netlink.Route{route} + staleRoutes = []*netlink.Route{oldServiceCIDRRoute.(*netlink.Route)} } else { // If current destination CIDR is nil, which means that Antrea Agent has just started, then all existing routes // whose destination CIDR contains the first ClusterIP should be uninstalled, except the newly installed route. @@ -1390,10 +1375,10 @@ func (c *Client) addServiceCIDRRoute(serviceCIDR *net.IPNet) error { if err.Error() == "no such process" { klog.InfoS("Failed to delete stale Service CIDR route since the route has been deleted", "route", rt) } else { - return fmt.Errorf("failed to uninstall stale Service CIDR route %s: %w", rt.String(), err) + return fmt.Errorf("failed to delete stale Service CIDR route %s: %w", rt.String(), err) } } else { - klog.V(4).InfoS("Uninstalled stale Service CIDR route successfully", "route", rt) + klog.V(4).InfoS("Deleted stale Service CIDR route successfully", "route", rt) } } @@ -1412,9 +1397,9 @@ func (c *Client) addVirtualNodePortDNATIPRoute(isIPv6 bool) error { } route := generateRoute(vIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE) if err := c.netlink.RouteReplace(route); err != nil { - return fmt.Errorf("failed to install routing entry for virtual NodePort DNAT IP %s: %w", vIP.String(), err) + return fmt.Errorf("failed to install route for NodePort DNAT IP %s: %w", vIP.String(), err) } - klog.V(4).InfoS("Added virtual NodePort DNAT IP route", "route", route) + klog.V(4).InfoS("Added NodePort DNAT IP route", "route", route) c.serviceRoutes.Store(vIP.String(), route) return nil @@ -1437,7 +1422,7 @@ func (c *Client) AddLoadBalancer(svcIP net.IP) error { route := generateRoute(svcIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE) if err := c.netlink.RouteReplace(route); err != nil { - return fmt.Errorf("failed to install routing entry for LoadBalancer ingress IP %s: %w", svcIP.String(), err) + return fmt.Errorf("failed to install route for LoadBalancer ingress IP %s: %w", svcIP.String(), err) } klog.V(4).InfoS("Added LoadBalancer ingress IP route", "route", route) c.serviceRoutes.Store(svcIP.String(), route) @@ -1448,28 +1433,21 @@ func (c *Client) AddLoadBalancer(svcIP net.IP) error { // DeleteLoadBalancer is used to delete routing entry which is used to route LoadBalancer ingress IP to Antrea // gateway on host. func (c *Client) DeleteLoadBalancer(svcIP net.IP) error { - linkIndex := c.nodeConfig.GatewayConfig.LinkIndex - isIPv6 := utilnet.IsIPv6(svcIP) - var gw net.IP - var mask int - if !isIPv6 { - gw = config.VirtualServiceIPv4 - mask = net.IPv4len * 8 - } else { - gw = config.VirtualServiceIPv6 - mask = net.IPv6len * 8 + svcIPStr := svcIP.String() + route, found := c.serviceRoutes.Load(svcIPStr) + if !found { + klog.V(2).InfoS("Didn't find route for LoadBalancer ingress IP", "ingressIP", svcIPStr) + return nil } - - route := generateRoute(svcIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE) - if err := c.netlink.RouteDel(route); err != nil { + if err := c.netlink.RouteDel(route.(*netlink.Route)); err != nil { if err.Error() == "no such process" { - klog.InfoS("Failed to delete LoadBalancer ingress IP route since the route has been deleted", "route", route) + klog.InfoS("Failed to delete route for LoadBalancer ingress IP since it doesn't exist", "route", route) } else { - return fmt.Errorf("failed to delete routing entry for LoadBalancer ingress IP %s: %w", svcIP.String(), err) + return fmt.Errorf("failed to delete route for LoadBalancer ingress IP %s: %w", svcIPStr, err) } } klog.V(4).InfoS("Deleted LoadBalancer ingress IP route", "route", route) - c.serviceRoutes.Delete(svcIP.String()) + c.serviceRoutes.Delete(svcIPStr) return nil } diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index 831fcce1214..6dab7a20e85 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -37,6 +37,20 @@ import ( "antrea.io/antrea/pkg/util/ip" ) +var ( + nodeConfig = &config.NodeConfig{GatewayConfig: &config.GatewayConfig{LinkIndex: 10}} + + externalIPv4Addr1 = "1.1.1.1" + externalIPv4Addr2 = "1.1.1.2" + externalIPv6Addr1 = "fd00:1234:5678:dead:beaf::1" + externalIPv6Addr2 = "fd00:1234:5678:dead:beaf::a" + + ipv4Route1 = generateRoute(net.ParseIP(externalIPv4Addr1), 32, config.VirtualServiceIPv4, 10, netlink.SCOPE_UNIVERSE) + ipv4Route2 = generateRoute(net.ParseIP(externalIPv4Addr2), 32, config.VirtualServiceIPv4, 10, netlink.SCOPE_UNIVERSE) + ipv6Route1 = generateRoute(net.ParseIP(externalIPv6Addr1), 128, config.VirtualServiceIPv6, 10, netlink.SCOPE_UNIVERSE) + ipv6Route2 = generateRoute(net.ParseIP(externalIPv6Addr2), 128, config.VirtualServiceIPv6, 10, netlink.SCOPE_UNIVERSE) +) + func TestSyncRoutes(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -78,7 +92,7 @@ func TestSyncRoutes(t *testing.T) { c.serviceRoutes.Store("169.254.0.253/32", serviceRoute1) c.serviceRoutes.Store("169.254.0.252/32", serviceRoute2) - assert.NoError(t, c.syncRoutes()) + assert.NoError(t, c.syncRoute()) } func TestSyncIPSet(t *testing.T) { @@ -468,7 +482,7 @@ COMMIT ctrl := gomock.NewController(t) defer ctrl.Finish() mockIPTables := iptablestest.NewMockInterface(ctrl) - c := &Client{ipt: mockIPTables, + c := &Client{iptables: mockIPTables, networkConfig: tt.networkConfig, nodeConfig: tt.nodeConfig, proxyAll: tt.proxyAll, @@ -1124,7 +1138,7 @@ func TestAddSNATRule(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockIPTables := iptablestest.NewMockInterface(ctrl) - c := &Client{ipt: mockIPTables, + c := &Client{iptables: mockIPTables, nodeConfig: tt.nodeConfig, } tt.expectedCalls(mockIPTables.EXPECT()) @@ -1191,7 +1205,7 @@ func TestDeleteSNATRule(t *testing.T) { defer ctrl.Finish() mockIPTables := iptablestest.NewMockInterface(ctrl) c := &Client{ - ipt: mockIPTables, + iptables: mockIPTables, nodeConfig: tt.nodeConfig, markToSNATIP: sync.Map{}, } @@ -1397,13 +1411,28 @@ func TestAddServiceCIDRRoute(t *testing.T) { defer ctrl.Finish() mockNetlink := netlinktest.NewMockInterface(ctrl) c := &Client{ - netlink: mockNetlink, - nodeConfig: nodeConfig, - serviceIPv4CIDR: tt.curServiceIPv4CIDR, - serviceIPv6CIDR: tt.curServiceIPv6CIDR, + netlink: mockNetlink, + nodeConfig: nodeConfig, } tt.expectedCalls(mockNetlink.EXPECT()) + if tt.curServiceIPv4CIDR != nil { + c.serviceRoutes.Store(serviceIPv4CIDRKey, &netlink.Route{ + Dst: &net.IPNet{IP: net.ParseIP("10.96.0.1").To4(), Mask: net.CIDRMask(32, 32)}, + Gw: config.VirtualServiceIPv4, + Scope: netlink.SCOPE_UNIVERSE, + LinkIndex: 10, + }) + } + if tt.curServiceIPv6CIDR != nil { + c.serviceRoutes.Store(serviceIPv6CIDRKey, &netlink.Route{ + Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::1"), Mask: net.CIDRMask(128, 128)}, + Gw: config.VirtualServiceIPv6, + Scope: netlink.SCOPE_UNIVERSE, + LinkIndex: 10, + }) + } + if tt.newServiceIPv4CIDR != nil { assert.NoError(t, c.addServiceCIDRRoute(tt.newServiceIPv4CIDR)) } @@ -1415,37 +1444,34 @@ func TestAddServiceCIDRRoute(t *testing.T) { } func TestAddLoadBalancer(t *testing.T) { - nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{LinkIndex: 10}} tests := []struct { name string - externalIP string + externalIPs []string + serviceRoutes map[string]*netlink.Route expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) }{ { - name: "IPv4", - externalIP: "1.1.1.1", + name: "IPv4", + serviceRoutes: map[string]*netlink.Route{ + externalIPv4Addr1: ipv4Route1, + externalIPv4Addr2: ipv4Route2, + }, + externalIPs: []string{externalIPv4Addr1, externalIPv4Addr2}, expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { - mockNetlink.RouteReplace(&netlink.Route{ - Dst: &net.IPNet{ - IP: net.ParseIP("1.1.1.1"), - Mask: net.CIDRMask(32, 32), - }, - Gw: config.VirtualServiceIPv4, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) + mockNetlink.RouteReplace(ipv4Route1) + mockNetlink.RouteReplace(ipv4Route2) }, }, { - name: "IPv6", - externalIP: "fd00:1234:5678:dead:beaf::1", + name: "IPv6", + serviceRoutes: map[string]*netlink.Route{ + externalIPv6Addr1: ipv6Route1, + externalIPv6Addr2: ipv6Route2, + }, + externalIPs: []string{externalIPv6Addr1, externalIPv6Addr2}, expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { - mockNetlink.RouteReplace(&netlink.Route{ - Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::1"), Mask: net.CIDRMask(128, 128)}, - Gw: config.VirtualServiceIPv6, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) + mockNetlink.RouteReplace(ipv6Route1) + mockNetlink.RouteReplace(ipv6Route2) }, }, } @@ -1460,43 +1486,42 @@ func TestAddLoadBalancer(t *testing.T) { } tt.expectedCalls(mockNetlink.EXPECT()) - assert.NoError(t, c.AddLoadBalancer(net.ParseIP(tt.externalIP))) + for _, externalIP := range tt.externalIPs { + assert.NoError(t, c.AddLoadBalancer(net.ParseIP(externalIP))) + } }) } } func TestDeleteLoadBalancer(t *testing.T) { - nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{LinkIndex: 10}} tests := []struct { name string - externalIP string + serviceRoutes map[string]*netlink.Route + externalIPs []string expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) }{ { - name: "IPv4", - externalIP: "1.1.1.1", + name: "IPv4", + serviceRoutes: map[string]*netlink.Route{ + externalIPv4Addr1: ipv4Route1, + externalIPv4Addr2: ipv4Route2, + }, + externalIPs: []string{externalIPv4Addr1, externalIPv4Addr2}, expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { - mockNetlink.RouteDel(&netlink.Route{ - Dst: &net.IPNet{ - IP: net.ParseIP("1.1.1.1"), - Mask: net.CIDRMask(32, 32), - }, - Gw: config.VirtualServiceIPv4, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) + mockNetlink.RouteDel(ipv4Route1) + mockNetlink.RouteDel(ipv4Route2) }, }, { - name: "IPv6", - externalIP: "fd00:1234:5678:dead:beaf::1", + name: "IPv6", + serviceRoutes: map[string]*netlink.Route{ + externalIPv6Addr1: ipv6Route1, + externalIPv6Addr2: ipv6Route2, + }, + externalIPs: []string{externalIPv6Addr1, externalIPv6Addr2}, expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { - mockNetlink.RouteDel(&netlink.Route{ - Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::1"), Mask: net.CIDRMask(128, 128)}, - Gw: config.VirtualServiceIPv6, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) + mockNetlink.RouteDel(ipv6Route1) + mockNetlink.RouteDel(ipv6Route2) }, }, } @@ -1506,12 +1531,18 @@ func TestDeleteLoadBalancer(t *testing.T) { defer ctrl.Finish() mockNetlink := netlinktest.NewMockInterface(ctrl) c := &Client{ - netlink: mockNetlink, - nodeConfig: nodeConfig, + netlink: mockNetlink, + nodeConfig: nodeConfig, + serviceRoutes: sync.Map{}, + } + for ipStr, route := range tt.serviceRoutes { + c.serviceRoutes.Store(ipStr, route) } tt.expectedCalls(mockNetlink.EXPECT()) - assert.NoError(t, c.DeleteLoadBalancer(net.ParseIP(tt.externalIP))) + for _, externalIP := range tt.externalIPs { + assert.NoError(t, c.DeleteLoadBalancer(net.ParseIP(externalIP))) + } }) } } diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 006dd4f7415..82257b9a995 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -21,9 +21,12 @@ import ( "errors" "fmt" "net" + "reflect" + "strings" "sync" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" @@ -40,6 +43,8 @@ const ( outboundFirewallRuleName = "Antrea: accept packets to local Pods" antreaNatNodePort = "antrea-nat-nodeport" + + serviceIPv4CIDRKey = "serviceIPv4CIDRKey" ) var ( @@ -50,23 +55,33 @@ var ( ) type Client struct { - nodeConfig *config.NodeConfig - networkConfig *config.NetworkConfig - hostRoutes *sync.Map - fwClient *winfirewall.Client - bridgeInfIndex int - noSNAT bool - proxyAll bool + nodeConfig *config.NodeConfig + networkConfig *config.NetworkConfig + // nodeRoutes caches ip routes to remote Pods. It's a map of podCIDR to routes. + nodeRoutes *sync.Map + // serviceRoutes caches ip routes about Services. + serviceRoutes *sync.Map + // netNatStaticMappings caches Windows NetNat for NodePort. + netNatStaticMappings *sync.Map + fwClient *winfirewall.Client + bridgeInfIndex int + noSNAT bool + proxyAll bool + // The latest calculated Service CIDRs can be got from serviceCIDRProvider. + serviceCIDRProvider servicecidr.Interface } // NewClient returns a route client. -func NewClient(networkConfig *config.NetworkConfig, noSNAT, proxyAll, connectUplinkToBridge, multicastEnabled bool, _ servicecidr.Interface) (*Client, error) { +func NewClient(networkConfig *config.NetworkConfig, noSNAT, proxyAll, connectUplinkToBridge, multicastEnabled bool, serviceCIDRProvider servicecidr.Interface) (*Client, error) { return &Client{ - networkConfig: networkConfig, - hostRoutes: &sync.Map{}, - fwClient: winfirewall.NewClient(), - noSNAT: noSNAT, - proxyAll: proxyAll, + networkConfig: networkConfig, + nodeRoutes: &sync.Map{}, + serviceRoutes: &sync.Map{}, + netNatStaticMappings: &sync.Map{}, + fwClient: winfirewall.NewClient(), + noSNAT: noSNAT, + proxyAll: proxyAll, + serviceCIDRProvider: serviceCIDRProvider, }, nil } @@ -103,6 +118,10 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { if err := c.initServiceIPRoutes(); err != nil { return fmt.Errorf("failed to initialize Service IP routes: %v", err) } + // For NodePort Service, a NetNatStaticMapping is needed. + if err := util.NewNetNat(antreaNatNodePort, virtualNodePortDNATIPv4Net); err != nil { + return err + } } done() @@ -114,10 +133,20 @@ func (c *Client) initServiceIPRoutes() error { if err := c.addVirtualServiceIPRoute(false); err != nil { return err } + if err := c.addVirtualNodePortDNATIPRoute(false); err != nil { + return err + } } if c.networkConfig.IPv6Enabled { return fmt.Errorf("IPv6 is not supported on Windows") } + c.serviceCIDRProvider.AddEventHandler(func(serviceCIDRs []*net.IPNet) { + for _, serviceCIDR := range serviceCIDRs { + if err := c.addServiceCIDRRoute(serviceCIDR); err != nil { + klog.ErrorS(err, "Failed to install route for Service CIDR", "ServiceCIDR", serviceCIDR) + } + } + }) return nil } @@ -125,17 +154,19 @@ func (c *Client) initServiceIPRoutes() error { // the route entries on the host gateway interface are stored in the cache. func (c *Client) Reconcile(podCIDRs []string) error { desiredPodCIDRs := sets.NewString(podCIDRs...) - routes, err := c.listRoutes() + routes, err := c.listIPRoutesOnGW() if err != nil { return err } for dst, rt := range routes { + if reflect.DeepEqual(rt.DestinationSubnet, c.nodeConfig.PodIPv4CIDR) { + continue + } if desiredPodCIDRs.Has(dst) { - c.hostRoutes.Store(dst, rt) continue } - // Don't delete the routes which are added by AntreaProxy. - if c.isServiceRoute(rt) { + // Don't delete the routes which are added by AntreaProxy when proxyAll is enabled. + if c.proxyAll && c.isServiceRoute(rt) { continue } err := util.RemoveNetRoute(rt) @@ -149,7 +180,7 @@ func (c *Client) Reconcile(podCIDRs []string) error { // AddRoutes adds routes to the provided podCIDR. // It overrides the routes if they already exist, without error. func (c *Client) AddRoutes(podCIDR *net.IPNet, nodeName string, peerNodeIP, peerGwIP net.IP) error { - obj, found := c.hostRoutes.Load(podCIDR.String()) + obj, found := c.nodeRoutes.Load(podCIDR.String()) route := &util.Route{ DestinationSubnet: podCIDR, RouteMetric: util.MetricDefault, @@ -187,7 +218,7 @@ func (c *Client) AddRoutes(podCIDR *net.IPNet, nodeName string, peerNodeIP, peer return err } - c.hostRoutes.Store(podCIDR.String(), route) + c.nodeRoutes.Store(podCIDR.String(), route) klog.V(2).Infof("Added route with destination %s via %s on host gateway on %s (%s)", podCIDR.String(), peerGwIP.String(), nodeName, peerNodeIP) return nil } @@ -195,7 +226,7 @@ func (c *Client) AddRoutes(podCIDR *net.IPNet, nodeName string, peerNodeIP, peer // DeleteRoutes deletes routes to the provided podCIDR. // It does nothing if the routes don't exist, without error. func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error { - obj, found := c.hostRoutes.Load(podCIDR.String()) + obj, found := c.nodeRoutes.Load(podCIDR.String()) if !found { klog.V(2).Infof("Route with destination %s not exists", podCIDR.String()) return nil @@ -205,118 +236,98 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error { if err := util.RemoveNetRoute(rt); err != nil { return err } - c.hostRoutes.Delete(podCIDR.String()) + c.nodeRoutes.Delete(podCIDR.String()) klog.V(2).Infof("Deleted route with destination %s from host gateway", podCIDR.String()) return nil } -// addVirtualServiceIPRoute adds routes on a Windows Node for redirecting ClusterIP and NodePort -// Service traffic from host network to OVS via antrea-gw0. +// addVirtualServiceIPRoute is used to add a route for a virtual IP. The virtual IP is used as the next hop IP for ClusterIP, +// NodePort and LoadBalancer routes. Without this route, routes for Service cannot be installed on Windows host. func (c *Client) addVirtualServiceIPRoute(isIPv6 bool) error { linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + svcIP := config.VirtualServiceIPv4 - // This route is for 2 purposes: - // - If for each ClusterIP Service, a route is installed to direct traffic to antrea-gw0, there will be too many - // neighbor cache entries. While with one virtual IP to antrea-gw0 and ClusterIP Services to the virtual IP, - // there will be only one neighbor cache entry. - // - For ClusterIP Service requests from host, reply traffic needs a route entry to route packet to VirtualServiceIPv4 - // via antrea-gw0. If the NextHop of a route is antrea-gw0, then set it as 0.0.0.0. As a result, on-link/0.0.0.0 - // is in PersistentStore. - // - For NodePort Service, it is the same. - vRoute := &util.Route{ - LinkIndex: linkIndex, - DestinationSubnet: virtualServiceIPv4Net, - GatewayAddress: net.IPv4zero, - RouteMetric: util.MetricHigh, - } - if err := util.ReplaceNetRoute(vRoute); err != nil { - return err - } - klog.InfoS("Added virtual Service IP route", "route", vRoute) - - // Service replies will be sent to OVS bridge via openflow.GlobalVirtualMAC. This NetNeighbor is for - // creating a neighbor cache entry to config.VirtualServiceIPv4. - vNeighbor := &util.Neighbor{ - LinkIndex: linkIndex, - IPAddress: config.VirtualServiceIPv4, - LinkLayerAddress: openflow.GlobalVirtualMAC, - State: "Permanent", - } - if err := util.ReplaceNetNeighbor(vNeighbor); err != nil { - return err + neigh := generateNeigh(svcIP, linkIndex) + if err := util.ReplaceNetNeighbor(neigh); err != nil { + return fmt.Errorf("failed to add new IP neighbour for %s: %w", svcIP, err) } - klog.InfoS("Added virtual Service IP neighbor", "neighbor", vNeighbor) + klog.InfoS("Added virtual Service IP neighbor", "neighbor", neigh) - if err := c.addServiceRoute(config.VirtualNodePortDNATIPv4); err != nil { - return err - } - // For NodePort Service, a new NetNat for NetNatStaticMapping is needed. - err := util.NewNetNat(antreaNatNodePort, virtualNodePortDNATIPv4Net) - if err != nil { - return err + route := generateRoute(virtualServiceIPv4Net, net.IPv4zero, linkIndex, util.MetricHigh) + if err := util.ReplaceNetRoute(route); err != nil { + return fmt.Errorf("failed to install route for virtual Service IP %s: %w", svcIP.String(), err) } + c.serviceRoutes.Store(svcIP.String(), route) + klog.InfoS("Added virtual Service IP route", "route", route) return nil } -// TODO: Follow the code style in Linux that maintains one Service CIDR. -func (c *Client) addServiceRoute(svcIP net.IP) error { - svcIPNet := util.NewIPNet(svcIP) - obj, found := c.hostRoutes.Load(svcIPNet.String()) +func (c *Client) addServiceCIDRRoute(serviceCIDR *net.IPNet) error { + linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + gw := config.VirtualServiceIPv4 + metric := util.MetricHigh - // Route: Service IP -> VirtualServiceIPv4 (169.254.0.253) - route := &util.Route{ - LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, - DestinationSubnet: svcIPNet, - GatewayAddress: config.VirtualServiceIPv4, - RouteMetric: util.MetricHigh, + oldServiceCIDRRoute, serviceCIDRRouteExists := c.serviceRoutes.Load(serviceIPv4CIDRKey) + // Generate a route with the new ClusterIP CIDR and install it. + route := generateRoute(serviceCIDR, gw, linkIndex, metric) + if err := util.ReplaceNetRoute(route); err != nil { + return fmt.Errorf("failed to install a new Service CIDR route: %w", err) } - if found { - existingRoute := obj.(*util.Route) - if existingRoute.GatewayAddress.Equal(route.GatewayAddress) && existingRoute.RouteMetric == route.RouteMetric { - klog.V(2).InfoS("Service route already exists", "DestinationIP", route.DestinationSubnet, - "Gateway", route.GatewayAddress, "RouteMetric", route.RouteMetric) - return nil - } - // Remove the existing route if gateway or metric is not as expected. - if err := util.RemoveNetRoute(existingRoute); err != nil { - return fmt.Errorf("failed to delete existing Service route entry, DestinationIP: %s, Gateway: %s, RouteMetric: %d, err: %v", - existingRoute.DestinationSubnet, existingRoute.GatewayAddress, existingRoute.RouteMetric, err) + + // Store the new ClusterIP CIDR and the new generated route to serviceRoutes. Then the calculated route can be restored + // when it was deleted since members of serviceRoutes are synchronized periodically. + c.serviceRoutes.Store(serviceIPv4CIDRKey, route) + + // Collect stale routes. + var staleRoutes []*util.Route + // If current destination CIDR is not nil, the route with current destination CIDR should be uninstalled since + // a new route with a newly calculated destination CIDR has been installed. + if serviceCIDRRouteExists { + staleRoutes = append(staleRoutes, oldServiceCIDRRoute.(*util.Route)) + } + routes, err := c.listIPRoutesOnGW() + if err != nil { + return fmt.Errorf("error listing ip routes: %w", err) + } + // Collect stale per-IP routes for ClusterIPs before this patch. + for _, rt := range routes { + ones, _ := rt.DestinationSubnet.Mask.Size() + if ones == net.IPv4len*8 && serviceCIDR.Contains(rt.DestinationSubnet.IP) { + staleRoutes = append(staleRoutes, rt) } } - if err := util.ReplaceNetRoute(route); err != nil { - return err + // Remove stale routes. + for _, rt := range staleRoutes { + if err := util.RemoveNetRoute(rt); err != nil { + if err.Error() == "No matching MSFT_NetRoute objects" { + klog.InfoS("Failed to delete stale Service CIDR route since the route has been deleted", "route", rt) + } else { + return fmt.Errorf("failed to delete stale Service CIDR route %s: %w", rt.String(), err) + } + } else { + klog.V(4).InfoS("Deleted stale Service CIDR route successfully", "route", rt) + } } - c.hostRoutes.Store(svcIPNet.String(), route) - klog.V(2).InfoS("Added Service route", "ServiceIP", route.DestinationSubnet, "GatewayIP", route.GatewayAddress) return nil } -func (c *Client) deleteServiceRoute(svcIP net.IP) error { - svcIPNet := util.NewIPNet(svcIP) - obj, found := c.hostRoutes.Load(svcIPNet.String()) - if !found { - klog.V(2).InfoS("Service route does not exist", "DestinationIP", svcIP) - return nil - } +// addVirtualNodePortDNATIPRoute is used to add a route which is used to route DNATed NodePort traffic to Antrea gateway. +func (c *Client) addVirtualNodePortDNATIPRoute(isIPv6 bool) error { + linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + vIP := config.VirtualNodePortDNATIPv4 + gw := config.VirtualServiceIPv4 - rt := obj.(*util.Route) - if err := util.RemoveNetRoute(rt); err != nil { - return err + route := generateRoute(virtualNodePortDNATIPv4Net, gw, linkIndex, util.MetricHigh) + if err := util.ReplaceNetRoute(route); err != nil { + return fmt.Errorf("failed to install route for NodePort DNAT IP %s: %w", vIP.String(), err) } - c.hostRoutes.Delete(svcIPNet.String()) - klog.V(2).InfoS("Deleted Service route from host gateway", "DestinationIP", svcIP) - return nil -} - -func (c *Client) AddClusterIPRoute(svcIP net.IP) error { - return c.addServiceRoute(svcIP) -} + c.serviceRoutes.Store(vIP.String(), route) + klog.InfoS("Added NodePort DNAT IP route", "route", route) -func (c *Client) DeleteClusterIPRoute(svcIP net.IP) error { - return c.deleteServiceRoute(svcIP) + return nil } // MigrateRoutesToGw is not supported on Windows. @@ -329,12 +340,76 @@ func (c *Client) UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error return errors.New("UnMigrateRoutesFromGw is unsupported on Windows") } -// Run is not supported on Windows and returns immediately. +// Run periodically syncs netNatStaticMapping and route. It will not return until stopCh is closed. func (c *Client) Run(stopCh <-chan struct{}) { + klog.InfoS("Starting netNatStaticMapping and route sync", "interval", SyncInterval) + wait.Until(c.syncIPInfra, SyncInterval, stopCh) +} + +// syncIPInfra is idempotent and can be safely called on every sync operation. +func (c *Client) syncIPInfra() { + if err := c.syncRoute(); err != nil { + klog.ErrorS(err, "Failed to sync route") + } + + if c.proxyAll { + if err := c.syncNetNatStaticMapping(); err != nil { + klog.ErrorS(err, "Failed to sync netNatStaticMapping") + } + } + klog.V(3).Info("Successfully synced netNatStaticMapping and route") +} + +func (c *Client) syncRoute() error { + restoreRoute := func(route *util.Route) bool { + if err := util.ReplaceNetRoute(route); err != nil { + klog.ErrorS(err, "Failed to sync route", "Route", route) + return false + } + return true + } + c.nodeRoutes.Range(func(_, v interface{}) bool { + route := v.(*util.Route) + return restoreRoute(route) + }) + if c.proxyAll { + c.serviceRoutes.Range(func(_, v interface{}) bool { + route := v.(*util.Route) + return restoreRoute(route) + }) + } + // The route is installed automatically by the kernel when the address is configured on the interface. If the route + // is deleted manually by mistake, we restore it. + gwAutoconfRoute := &util.Route{ + LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, + DestinationSubnet: c.nodeConfig.PodIPv4CIDR, + GatewayAddress: c.nodeConfig.GatewayConfig.IPv4, + RouteMetric: util.MetricDefault, + } + restoreRoute(gwAutoconfRoute) + + return nil +} + +func (c *Client) syncNetNatStaticMapping() error { + if err := util.NewNetNat(antreaNatNodePort, virtualNodePortDNATIPv4Net); err != nil { + return err + } + + c.netNatStaticMappings.Range(func(_, v interface{}) bool { + mapping := v.(*util.NetNatStaticMapping) + if err := util.ReplaceNetNatStaticMapping(mapping); err != nil { + klog.ErrorS(err, "Failed to add netNatStaticMapping", "netNatStaticMapping", mapping) + return false + } + return true + }) + + return nil } func (c *Client) isServiceRoute(route *util.Route) bool { - // If the destination IP or gateway IP is the virtual Service IP, then it is the Service route added by AntreaProxy. + // If the gateway IP or the destination IP is the virtual Service IP, then it is a route added by AntreaProxy. if route.DestinationSubnet != nil && route.DestinationSubnet.IP.Equal(config.VirtualServiceIPv4) || route.GatewayAddress != nil && route.GatewayAddress.Equal(config.VirtualServiceIPv4) { return true @@ -342,7 +417,7 @@ func (c *Client) isServiceRoute(route *util.Route) bool { return false } -func (c *Client) listRoutes() (map[string]*util.Route, error) { +func (c *Client) listIPRoutesOnGW() (map[string]*util.Route, error) { routes, err := util.GetNetRoutesAll() if err != nil { return nil, err @@ -364,8 +439,14 @@ func (c *Client) listRoutes() (map[string]*util.Route, error) { if !rt.DestinationSubnet.IP.IsGlobalUnicast() { continue } - // Windows adds an active route entry for the local broadcast address automatically when a new IP address - // is configured on the interface. This route entry should be ignored in the list. + // When configuring an IP address to an interface on Windows, three route entries will be automatically added. + // For example, if the IP address is 10.10.0.1/24, the following three routes will be created: + // Network Destination Netmask Gateway Interface Metric + // 10.10.0.1 255.255.255.255 On-link 10.10.0.1 281 + // 10.10.0.0 255.255.255.0 On-link 10.10.0.1 281 + // 10.10.0.255 255.255.255.255 On-link 10.10.0.1 281 + // The host (10.10.0.1) and broadcast (10.10.0.255) routes should be ignored since they are not supposed to be + // managed by Antrea. We can ignore them by comparing them to the calculated broadcast IP. if !rt.GatewayAddress.Equal(config.VirtualServiceIPv4) && rt.DestinationSubnet.IP.Equal(iputil.GetLocalBroadcastIP(rt.DestinationSubnet)) { continue } @@ -397,19 +478,73 @@ func (c *Client) DeleteSNATRule(mark uint32) error { // TODO: nodePortAddresses is not supported currently. func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { - return util.ReplaceNetNatStaticMapping(antreaNatNodePort, "0.0.0.0", port, config.VirtualNodePortDNATIPv4.String(), port, string(protocol)) + netNatStaticMapping := &util.NetNatStaticMapping{ + Name: antreaNatNodePort, + ExternalIP: net.ParseIP("0.0.0.0"), + ExternalPort: port, + InternalIP: config.VirtualNodePortDNATIPv4, + InternalPort: port, + Protocol: protocol, + } + if err := util.ReplaceNetNatStaticMapping(netNatStaticMapping); err != nil { + return err + } + c.netNatStaticMappings.Store(fmt.Sprintf("%d-%s", port, protocol), netNatStaticMapping) + klog.V(4).InfoS("Added NetNatStaticMapping for NodePort", "NetNatStaticMapping", netNatStaticMapping) + return nil } func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { - return util.RemoveNetNatStaticMapping(antreaNatNodePort, "0.0.0.0", port, string(protocol)) + key := fmt.Sprintf("%d-%s", port, protocol) + obj, found := c.netNatStaticMappings.Load(key) + if !found { + klog.V(2).InfoS("Didn't find corresponding NetNatStaticMapping for NodePort", "port", port, "protocol", protocol) + return nil + } + netNatStaticMapping := obj.(*util.NetNatStaticMapping) + if err := util.RemoveNetNatStaticMapping(netNatStaticMapping); err != nil { + return err + } + c.netNatStaticMappings.Delete(key) + klog.V(4).InfoS("Deleted NetNatStaticMapping for NodePort", "NetNatStaticMapping", netNatStaticMapping) + return nil } func (c *Client) AddLoadBalancer(externalIP net.IP) error { - return c.addServiceRoute(externalIP) + svcIPStr := externalIP.String() + linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + gw := config.VirtualServiceIPv4 + metric := util.MetricHigh + _, svcIPNet, _ := net.ParseCIDR(svcIPStr) + + route := generateRoute(svcIPNet, gw, linkIndex, metric) + if err := util.ReplaceNetRoute(route); err != nil { + return fmt.Errorf("failed to install route for LoadBalancer ingress IP %s: %w", svcIPStr, err) + } + klog.V(4).InfoS("Added LoadBalancer ingress IP route", "route", route) + c.serviceRoutes.Store(svcIPStr, route) + + return nil } func (c *Client) DeleteLoadBalancer(externalIP net.IP) error { - return c.deleteServiceRoute(externalIP) + svcIPStr := externalIP.String() + route, found := c.serviceRoutes.Load(svcIPStr) + if !found { + klog.V(2).InfoS("Didn't find route for LoadBalancer ingress IP", "ingressIP", svcIPStr) + return nil + } + if err := util.RemoveNetRoute(route.(*util.Route)); err != nil { + if strings.Contains(err.Error(), "No matching MSFT_NetRoute objects") { + klog.InfoS("Failed to delete route for LoadBalancer ingress IP since it doesn't exist", "route", route) + } else { + return fmt.Errorf("failed to delete route for LoadBalancer ingress IP %s: %w", svcIPStr, err) + } + } + klog.V(4).InfoS("Deleted LoadBalancer ingress IP route", "route", route) + c.serviceRoutes.Delete(svcIPStr) + + return nil } func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { @@ -419,3 +554,21 @@ func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error func (c *Client) DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { return nil } + +func generateRoute(ipNet *net.IPNet, gw net.IP, linkIndex int, metric int) *util.Route { + return &util.Route{ + DestinationSubnet: ipNet, + GatewayAddress: gw, + RouteMetric: metric, + LinkIndex: linkIndex, + } +} + +func generateNeigh(ip net.IP, linkIndex int) *util.Neighbor { + return &util.Neighbor{ + LinkIndex: linkIndex, + IPAddress: ip, + LinkLayerAddress: openflow.GlobalVirtualMAC, + State: "Permanent", + } +} diff --git a/pkg/agent/route/route_windows_test.go b/pkg/agent/route/route_windows_test.go index 3682301197a..90fb803dda4 100644 --- a/pkg/agent/route/route_windows_test.go +++ b/pkg/agent/route/route_windows_test.go @@ -51,12 +51,6 @@ func TestRouteOperation(t *testing.T) { _, destCIDR2, _ := net.ParseCIDR(dest2) client, err := NewClient(&config.NetworkConfig{}, true, false, false, false, nil) - svcStr1 := "1.1.0.10" - svcIP1 := net.ParseIP(svcStr1) - svcIPNet1 := util.NewIPNet(svcIP1) - svcStr2 := "1.1.0.11" - svcIP2 := net.ParseIP(svcStr2) - svcIPNet2 := util.NewIPNet(svcIP2) require.Nil(t, err) nodeConfig := &config.NodeConfig{ @@ -84,24 +78,6 @@ func TestRouteOperation(t *testing.T) { require.Nil(t, err) assert.Equal(t, 1, len(routes2)) - err = client.AddClusterIPRoute(svcIP1) - require.Nil(t, err) - route3, err := util.GetNetRoutes(gwLink, svcIPNet1) - require.Nil(t, err) - assert.Equal(t, 1, len(route3)) - obj, found := client.hostRoutes.Load(svcIPNet1.String()) - assert.True(t, found) - assert.EqualValues(t, route3[0], *obj.(*util.Route)) - - err = client.AddClusterIPRoute(svcIP2) - require.Nil(t, err) - route4, err := util.GetNetRoutes(gwLink, svcIPNet2) - require.Nil(t, err) - assert.Equal(t, 1, len(route4)) - obj, found = client.hostRoutes.Load(svcIPNet2.String()) - assert.True(t, found) - assert.EqualValues(t, route4[0], *obj.(*util.Route)) - err = client.Reconcile([]string{dest2}) require.Nil(t, err) @@ -109,21 +85,9 @@ func TestRouteOperation(t *testing.T) { require.Nil(t, err) assert.Equal(t, 0, len(routes5)) - routes6, err := util.GetNetRoutes(gwLink, svcIPNet2) - require.Nil(t, err) - assert.Equal(t, 1, len(routes6)) - err = client.DeleteRoutes(destCIDR2) require.Nil(t, err) routes7, err := util.GetNetRoutes(gwLink, destCIDR2) require.Nil(t, err) assert.Equal(t, 0, len(routes7)) - - err = client.DeleteClusterIPRoute(svcIP1) - require.Nil(t, err) - routes8, err := util.GetNetRoutes(gwLink, svcIPNet1) - require.Nil(t, err) - assert.Equal(t, 0, len(routes8)) - _, found = client.hostRoutes.Load(svcIPNet1.String()) - assert.False(t, found) } diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index ba8086898f5..8d5a3256530 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -50,20 +50,6 @@ func (m *MockInterface) EXPECT() *MockInterfaceMockRecorder { return m.recorder } -// AddClusterIPRoute mocks base method -func (m *MockInterface) AddClusterIPRoute(arg0 net.IP) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddClusterIPRoute", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// AddClusterIPRoute indicates an expected call of AddClusterIPRoute -func (mr *MockInterfaceMockRecorder) AddClusterIPRoute(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddClusterIPRoute", reflect.TypeOf((*MockInterface)(nil).AddClusterIPRoute), arg0) -} - // AddLoadBalancer mocks base method func (m *MockInterface) AddLoadBalancer(arg0 net.IP) error { m.ctrl.T.Helper() @@ -134,20 +120,6 @@ func (mr *MockInterfaceMockRecorder) AddSNATRule(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSNATRule", reflect.TypeOf((*MockInterface)(nil).AddSNATRule), arg0, arg1) } -// DeleteClusterIPRoute mocks base method -func (m *MockInterface) DeleteClusterIPRoute(arg0 net.IP) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteClusterIPRoute", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteClusterIPRoute indicates an expected call of DeleteClusterIPRoute -func (mr *MockInterfaceMockRecorder) DeleteClusterIPRoute(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteClusterIPRoute", reflect.TypeOf((*MockInterface)(nil).DeleteClusterIPRoute), arg0) -} - // DeleteLoadBalancer mocks base method func (m *MockInterface) DeleteLoadBalancer(arg0 net.IP) error { m.ctrl.T.Helper() diff --git a/pkg/agent/util/net.go b/pkg/agent/util/net.go index b5deaef7ac8..507c89790d3 100644 --- a/pkg/agent/util/net.go +++ b/pkg/agent/util/net.go @@ -299,6 +299,9 @@ func GetIPWithFamily(ips []net.IP, addrFamily uint8) (net.IP, error) { // ExtendCIDRWithIP is used for extending an IPNet with an IP. func ExtendCIDRWithIP(ipNet *net.IPNet, ip net.IP) (*net.IPNet, error) { + if ipNet == nil { + return NewIPNet(ip), nil + } cpl := longestCommonPrefixLen(ipNet.IP, ip) if cpl == 0 { return nil, fmt.Errorf("invalid common prefix length") diff --git a/pkg/agent/util/net_windows.go b/pkg/agent/util/net_windows.go index 3b89c069818..db6464baedf 100644 --- a/pkg/agent/util/net_windows.go +++ b/pkg/agent/util/net_windows.go @@ -34,6 +34,7 @@ import ( "k8s.io/klog/v2" ps "antrea.io/antrea/pkg/agent/util/powershell" + binding "antrea.io/antrea/pkg/ovs/openflow" ) const ( @@ -74,6 +75,14 @@ func (r Route) String() string { r.LinkIndex, r.DestinationSubnet, r.GatewayAddress, r.RouteMetric) } +func (r Route) Equal(x Route) bool { + return x.LinkIndex == r.LinkIndex && + x.DestinationSubnet != nil && + r.DestinationSubnet != nil && + x.DestinationSubnet.IP.Equal(r.DestinationSubnet.IP) && + x.GatewayAddress.Equal(r.GatewayAddress) +} + type Neighbor struct { LinkIndex int IPAddress net.IP @@ -85,6 +94,19 @@ func (n Neighbor) String() string { return fmt.Sprintf("LinkIndex: %d, IPAddress: %s, LinkLayerAddress: %s", n.LinkIndex, n.IPAddress, n.LinkLayerAddress) } +type NetNatStaticMapping struct { + Name string + ExternalIP net.IP + ExternalPort uint16 + InternalIP net.IP + InternalPort uint16 + Protocol binding.Protocol +} + +func (n NetNatStaticMapping) String() string { + return fmt.Sprintf("Name: %s, ExternalIP %s, ExternalPort: %d, InternalIP: %s, InternalPort: %d, Protocol: %s", n.Name, n.ExternalIP, n.ExternalPort, n.InternalIP, n.InternalPort, n.Protocol) +} + func GetNSPath(containerNetNS string) (string, error) { return containerNetNS, nil } @@ -702,10 +724,10 @@ func NewNetNat(netNatName string, subnetCIDR *net.IPNet) error { } } else { if strings.Contains(internalNet, subnetCIDR.String()) { - klog.InfoS("existing netnat in CIDR", "name", internalNet, "subnetCIDR", subnetCIDR.String()) + klog.V(4).InfoS("The existing netnat matched the subnet CIDR", "name", internalNet, "subnetCIDR", subnetCIDR.String()) return nil } - klog.InfoS("Removing the existing netnat", "name", netNatName, "internalIPInterfaceAddressPrefix", internalNet) + klog.InfoS("Removing the existing NetNat", "name", netNatName, "internalIPInterfaceAddressPrefix", internalNet) cmd = fmt.Sprintf("Remove-NetNat -Name %s -Confirm:$false", netNatName) if _, err := runCommand(cmd); err != nil { klog.ErrorS(err, "Failed to remove the existing netnat", "name", netNatName, "internalIPInterfaceAddressPrefix", internalNet) @@ -721,15 +743,15 @@ func NewNetNat(netNatName string, subnetCIDR *net.IPNet) error { return nil } -func ReplaceNetNatStaticMapping(netNatName string, externalIPAddr string, externalPort uint16, internalIPAddr string, internalPort uint16, proto string) error { - staticMappingStr, err := GetNetNatStaticMapping(netNatName, externalIPAddr, externalPort, proto) +func ReplaceNetNatStaticMapping(mapping *NetNatStaticMapping) error { + staticMappingStr, err := GetNetNatStaticMapping(mapping) if err != nil { return err } parsed := parseGetNetCmdResult(staticMappingStr, 6) if len(parsed) > 0 { items := parsed[0] - if items[4] == internalIPAddr && items[5] == strconv.Itoa(int(internalPort)) { + if items[4] == mapping.InternalIP.String() && items[5] == strconv.Itoa(int(mapping.InternalPort)) { return nil } firstCol := strings.Split(items[0], ";") @@ -737,19 +759,19 @@ func ReplaceNetNatStaticMapping(netNatName string, externalIPAddr string, extern if err != nil { return err } - if err := RemoveNetNatStaticMappingByID(netNatName, id); err != nil { + if err := RemoveNetNatStaticMappingByID(mapping.Name, id); err != nil { return err } } - return AddNetNatStaticMapping(netNatName, externalIPAddr, externalPort, internalIPAddr, internalPort, proto) + return AddNetNatStaticMapping(mapping) } // GetNetNatStaticMapping checks if a NetNatStaticMapping exists. -func GetNetNatStaticMapping(netNatName string, externalIPAddr string, externalPort uint16, proto string) (string, error) { - cmd := fmt.Sprintf("Get-NetNatStaticMapping -NatName %s", netNatName) + - fmt.Sprintf("|? ExternalIPAddress -EQ %s", externalIPAddr) + - fmt.Sprintf("|? ExternalPort -EQ %d", externalPort) + - fmt.Sprintf("|? Protocol -EQ %s", proto) + +func GetNetNatStaticMapping(mapping *NetNatStaticMapping) (string, error) { + cmd := fmt.Sprintf("Get-NetNatStaticMapping -NatName %s", mapping.Name) + + fmt.Sprintf("|? ExternalIPAddress -EQ %s", mapping.ExternalIP) + + fmt.Sprintf("|? ExternalPort -EQ %d", mapping.ExternalPort) + + fmt.Sprintf("|? Protocol -EQ %s", mapping.Protocol) + "| Format-Table -HideTableHeaders" staticMappingStr, err := runCommand(cmd) if err != nil && !strings.Contains(err.Error(), "No MSFT_NetNatStaticMapping objects found") { @@ -759,15 +781,15 @@ func GetNetNatStaticMapping(netNatName string, externalIPAddr string, externalPo } // AddNetNatStaticMapping adds a static mapping to a NAT instance. -func AddNetNatStaticMapping(netNatName string, externalIPAddr string, externalPort uint16, internalIPAddr string, internalPort uint16, proto string) error { +func AddNetNatStaticMapping(mapping *NetNatStaticMapping) error { cmd := fmt.Sprintf("Add-NetNatStaticMapping -NatName %s -ExternalIPAddress %s -ExternalPort %d -InternalIPAddress %s -InternalPort %d -Protocol %s", - netNatName, externalIPAddr, externalPort, internalIPAddr, internalPort, proto) + mapping.Name, mapping.ExternalIP, mapping.ExternalPort, mapping.InternalIP, mapping.InternalPort, mapping.Protocol) _, err := runCommand(cmd) return err } -func RemoveNetNatStaticMapping(netNatName string, externalIPAddr string, externalPort uint16, proto string) error { - staticMappingStr, err := GetNetNatStaticMapping(netNatName, externalIPAddr, externalPort, proto) +func RemoveNetNatStaticMapping(mapping *NetNatStaticMapping) error { + staticMappingStr, err := GetNetNatStaticMapping(mapping) if err != nil { return err } @@ -781,24 +803,24 @@ func RemoveNetNatStaticMapping(netNatName string, externalIPAddr string, externa if err != nil { return err } - return RemoveNetNatStaticMappingByID(netNatName, id) + return RemoveNetNatStaticMappingByID(mapping.Name, id) } -func RemoveNetNatStaticMappingByNPLTuples(netNatName string, externalIPAddr string, externalPort uint16, internalIPAddr string, internalPort uint16, proto string) error { - staticMappingStr, err := GetNetNatStaticMapping(netNatName, externalIPAddr, externalPort, proto) +func RemoveNetNatStaticMappingByNPLTuples(mapping *NetNatStaticMapping) error { + staticMappingStr, err := GetNetNatStaticMapping(mapping) if err != nil { return err } parsed := parseGetNetCmdResult(staticMappingStr, 6) if len(parsed) > 0 { items := parsed[0] - if items[4] == internalIPAddr && items[5] == strconv.Itoa(int(internalPort)) { + if items[4] == mapping.InternalIP.String() && items[5] == strconv.Itoa(int(mapping.InternalPort)) { firstCol := strings.Split(items[0], ";") id, err := strconv.Atoi(firstCol[1]) if err != nil { return err } - if err := RemoveNetNatStaticMappingByID(netNatName, id); err != nil { + if err := RemoveNetNatStaticMappingByID(mapping.Name, id); err != nil { return err } return nil diff --git a/pkg/agent/util/net_windows_test.go b/pkg/agent/util/net_windows_test.go index b02d783df7d..8245ec94b48 100644 --- a/pkg/agent/util/net_windows_test.go +++ b/pkg/agent/util/net_windows_test.go @@ -26,6 +26,8 @@ import ( "github.com/Microsoft/hcsshim" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "antrea.io/antrea/pkg/ovs/openflow" ) func TestRouteString(t *testing.T) { @@ -642,16 +644,27 @@ func TestNewNetNat(t *testing.T) { func TestReplaceNetNatStaticMapping(t *testing.T) { notFoundErr := fmt.Errorf("received error No MSFT_NetNatStaticMapping objects found") - testNetNat, testExternalIPAddr, testInternalIPAddr, testProto := "test-nat", "a.b.c.0/24", "192.0.02.179", "tcp" + testNetNatName := "test-nat" testExternalPort, testInternalPort := (uint16)(80), (uint16)(8080) - getCmd := fmt.Sprintf("Get-NetNatStaticMapping -NatName %s", testNetNat) + + testExternalIPAddr, testInternalIPAddr := "10.10.0.1", "192.0.2.179" + testProto := openflow.ProtocolTCP + testNetNat := &NetNatStaticMapping{ + Name: testNetNatName, + ExternalIP: net.ParseIP(testExternalIPAddr), + ExternalPort: testExternalPort, + InternalIP: net.ParseIP(testInternalIPAddr), + InternalPort: testInternalPort, + Protocol: testProto, + } + + getCmd := fmt.Sprintf("Get-NetNatStaticMapping -NatName %s", testNetNatName) + fmt.Sprintf("|? ExternalIPAddress -EQ %s", testExternalIPAddr) + fmt.Sprintf("|? ExternalPort -EQ %d", testExternalPort) + fmt.Sprintf("|? Protocol -EQ %s", testProto) + "| Format-Table -HideTableHeaders" - removeCmd := fmt.Sprintf("Remove-NetNatStaticMapping -NatName %s -StaticMappingID %d -Confirm:$false", testNetNat, 1) + removeCmd := fmt.Sprintf("Remove-NetNatStaticMapping -NatName %s -StaticMappingID %d -Confirm:$false", testNetNatName, 1) addCmd := fmt.Sprintf("Add-NetNatStaticMapping -NatName %s -ExternalIPAddress %s -ExternalPort %d -InternalIPAddress %s -InternalPort %d -Protocol %s", - testNetNat, testExternalIPAddr, testExternalPort, testInternalIPAddr, testInternalPort, testProto) + testNetNatName, testExternalIPAddr, testExternalPort, testInternalIPAddr, testInternalPort, testProto) type testFormat struct { name string commandOut string @@ -690,22 +703,32 @@ func TestReplaceNetNatStaticMapping(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { defer mockRunCommand(t, tc.wantCmds, tc.commandOut, tc.commandErr, true)() - gotErr := ReplaceNetNatStaticMapping(testNetNat, testExternalIPAddr, testExternalPort, testInternalIPAddr, testInternalPort, testProto) + gotErr := ReplaceNetNatStaticMapping(testNetNat) assert.Equal(t, tc.wantErr, gotErr) }) } } func TestRemoveNetNatStaticMapping(t *testing.T) { - testNetNat, testExternalIPAddr, testProto := "test-nat", "a.b.c.0/24", "tcp" - testExternalPort := (uint16)(80) - getCmd := fmt.Sprintf("Get-NetNatStaticMapping -NatName %s", testNetNat) + + testNetNatName := "test-nat" + testExternalPort, testInternalPort := (uint16)(80), (uint16)(8080) + testExternalIPAddr, testInternalIPAddr := "10.10.0.1", "192.0.2.179" + testProto := openflow.ProtocolTCP + testNetNat := &NetNatStaticMapping{ + Name: testNetNatName, + ExternalIP: net.ParseIP(testExternalIPAddr), + ExternalPort: testExternalPort, + InternalIP: net.ParseIP(testInternalIPAddr), + InternalPort: testInternalPort, + Protocol: testProto, + } + getCmd := fmt.Sprintf("Get-NetNatStaticMapping -NatName %s", testNetNatName) + fmt.Sprintf("|? ExternalIPAddress -EQ %s", testExternalIPAddr) + fmt.Sprintf("|? ExternalPort -EQ %d", testExternalPort) + fmt.Sprintf("|? Protocol -EQ %s", testProto) + "| Format-Table -HideTableHeaders" - removeIDCmd := fmt.Sprintf("Remove-NetNatStaticMapping -NatName %s -StaticMappingID %d -Confirm:$false", testNetNat, 1) - removeCmd := fmt.Sprintf("Remove-NetNatStaticMapping -NatName %s -Confirm:$false", testNetNat) + removeIDCmd := fmt.Sprintf("Remove-NetNatStaticMapping -NatName %s -StaticMappingID %d -Confirm:$false", testNetNatName, 1) + removeCmd := fmt.Sprintf("Remove-NetNatStaticMapping -NatName %s -Confirm:$false", testNetNatName) tests := []struct { name string commandOut string @@ -729,11 +752,11 @@ func TestRemoveNetNatStaticMapping(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { defer mockRunCommand(t, tc.wantCmds, tc.commandOut, tc.commandErr, false)() - gotErr := RemoveNetNatStaticMapping(testNetNat, testExternalIPAddr, testExternalPort, testProto) + gotErr := RemoveNetNatStaticMapping(testNetNat) assert.Equal(t, tc.wantErr, gotErr) - gotErr = RemoveNetNatStaticMappingByNPLTuples(testNetNat, testExternalIPAddr, testExternalPort, "192.0.02.179", 8080, testProto) + gotErr = RemoveNetNatStaticMappingByNPLTuples(testNetNat) assert.Equal(t, tc.wantErr, gotErr) - gotErr = RemoveNetNatStaticMappingByNAME(testNetNat) + gotErr = RemoveNetNatStaticMappingByNAME(testNetNat.Name) assert.Equal(t, tc.wantErr, gotErr) }) } diff --git a/test/integration/agent/route_test.go b/test/integration/agent/route_test.go index 75e7b5ffd51..94bfa0e482a 100644 --- a/test/integration/agent/route_test.go +++ b/test/integration/agent/route_test.go @@ -284,9 +284,9 @@ func TestIpTablesSync(t *testing.T) { assert.Equal(t, "", string(actualData), "failed to remove iptables rule for %v", tc) } stopCh := make(chan struct{}) - route.IPTablesSyncInterval = 2 * time.Second + route.SyncInterval = 2 * time.Second go routeClient.Run(stopCh) - time.Sleep(route.IPTablesSyncInterval) // wait for one iteration of sync operation. + time.Sleep(route.SyncInterval) // wait for one iteration of sync operation. for _, tc := range tcs { saveCmd := fmt.Sprintf("iptables-save -t %s | grep -e '%s %s'", tc.Table, tc.Cmd, tc.Chain) // #nosec G204: ignore in test code @@ -443,9 +443,9 @@ func TestSyncRoutes(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - route.IPTablesSyncInterval = 2 * time.Second + route.SyncInterval = 2 * time.Second go routeClient.Run(stopCh) - time.Sleep(route.IPTablesSyncInterval) // wait for one iteration of sync operation. + time.Sleep(route.SyncInterval) // wait for one iteration of sync operation. output, err := exec.Command("bash", "-c", listCmd).Output() assert.NoError(t, err, "error executing ip route command: %s", listCmd) @@ -487,10 +487,10 @@ func TestSyncGatewayKernelRoute(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - route.IPTablesSyncInterval = 2 * time.Second + route.SyncInterval = 2 * time.Second go routeClient.Run(stopCh) - err = wait.Poll(1*time.Second, 2*route.IPTablesSyncInterval, func() (done bool, err error) { + err = wait.Poll(1*time.Second, 2*route.SyncInterval, func() (done bool, err error) { expOutput, err := exec.Command("bash", "-c", listCmd).Output() if err != nil { return false, err