diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 472db910f27..28dbaa346eb 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -232,6 +232,7 @@ func run(o *Options) error { connectUplinkToBridge, nodeNetworkPolicyEnabled, multicastEnabled, + o.config.KubeAPIServerOverride != "", serviceCIDRProvider) if err != nil { return fmt.Errorf("error creating route client: %v", err) diff --git a/pkg/agent/controller/networkpolicy/node_reconciler_linux.go b/pkg/agent/controller/networkpolicy/node_reconciler_linux.go index 9b88a77b7e6..c04790ef051 100644 --- a/pkg/agent/controller/networkpolicy/node_reconciler_linux.go +++ b/pkg/agent/controller/networkpolicy/node_reconciler_linux.go @@ -31,6 +31,7 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/agent/types" + ipsetutil "antrea.io/antrea/pkg/agent/util/ipset" "antrea.io/antrea/pkg/agent/util/iptables" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" @@ -622,7 +623,7 @@ func buildCoreIPTRule(ipProtocol iptables.Protocol, builder := iptables.NewRuleBuilder(iptChain) if isIngress { if ipset != "" { - builder = builder.MatchIPSetSrc(ipset) + builder = builder.MatchIPSetSrc(ipset, ipsetutil.HashIP) } else if ipnet != "" { builder = builder.MatchCIDRSrc(ipnet) } else { @@ -631,7 +632,7 @@ func buildCoreIPTRule(ipProtocol iptables.Protocol, } } else { if ipset != "" { - builder = builder.MatchIPSetDst(ipset) + builder = builder.MatchIPSetDst(ipset, ipsetutil.HashIP) } else if ipnet != "" { builder = builder.MatchCIDRDst(ipnet) } else { @@ -648,8 +649,8 @@ func buildCoreIPTRule(ipProtocol iptables.Protocol, fallthrough case "sctp": builder = builder.MatchTransProtocol(transProtocol). - MatchSrcPort(service.SrcPort, service.SrcEndPort). - MatchDstPort(service.Port, service.EndPort) + MatchPortSrc(service.SrcPort, service.SrcEndPort). + MatchPortDst(service.Port, service.EndPort) case "icmp": builder = builder.MatchICMP(service.ICMPType, service.ICMPCode, ipProtocol) } @@ -673,8 +674,8 @@ func buildServiceIPTRules(ipProtocol iptables.Protocol, services []v1beta2.Servi fallthrough case "sctp": copiedBuilder = copiedBuilder.MatchTransProtocol(transProtocol). - MatchSrcPort(svc.SrcPort, svc.SrcEndPort). - MatchDstPort(svc.Port, svc.EndPort) + MatchPortSrc(svc.SrcPort, svc.SrcEndPort). + MatchPortDst(svc.Port, svc.EndPort) case "icmp": copiedBuilder = copiedBuilder.MatchICMP(svc.ICMPType, svc.ICMPCode, ipProtocol) } diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 54fecc1db33..35a20ed1afb 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -229,6 +229,7 @@ func (p *proxier) removeServiceFlows(svcInfo *types.ServiceInfo) bool { svcInfoStr := svcInfo.String() svcPort := uint16(svcInfo.Port()) svcProto := svcInfo.OFProtocol + loadBalancerMode := p.getLoadBalancerMode(svcInfo) // Remove ClusterIP flows. if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), svcPort, svcProto); err != nil { klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServiceInfo", svcInfoStr) @@ -242,14 +243,14 @@ func (p *proxier) removeServiceFlows(svcInfo *types.ServiceInfo) bool { return false } // Remove ExternalIP flows and configurations. - if err := p.uninstallExternalIPService(svcInfoStr, svcInfo.ExternalIPStrings(), svcPort, svcProto); err != nil { + if err := p.uninstallExternalIPService(svcInfoStr, svcInfo.ExternalIPStrings(), svcPort, svcProto, loadBalancerMode); err != nil { klog.ErrorS(err, "Error when uninstalling ExternalIP flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } } // Remove LoadBalancer flows and configurations. if p.proxyLoadBalancerIPs { - if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), svcPort, svcProto); err != nil { + if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), svcPort, svcProto, loadBalancerMode); err != nil { klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } @@ -554,7 +555,7 @@ func (p *proxier) installNodePortService(localGroupID, clusterGroupID binding.Gr }); err != nil { return fmt.Errorf("failed to install NodePort load balancing flows: %w", err) } - if err := p.routeClient.AddNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { + if err := p.routeClient.AddNodePortConf(p.nodePortAddresses, svcPort, protocol); err != nil { return fmt.Errorf("failed to install NodePort traffic redirecting rules: %w", err) } return nil @@ -571,7 +572,7 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil { return fmt.Errorf("failed to remove NodePort load balancing flows: %w", err) } - if err := p.routeClient.DeleteNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { + if err := p.routeClient.DeleteNodePortConf(p.nodePortAddresses, svcPort, protocol); err != nil { return fmt.Errorf("failed to remove NodePort traffic redirecting rules: %w", err) } return nil @@ -586,6 +587,8 @@ func (p *proxier) installExternalIPService(svcInfoStr string, trafficPolicyLocal bool, affinityTimeout uint16, loadBalancerMode agentconfig.LoadBalancerMode) error { + externalIPs := make([]net.IP, 0, len(externalIPStrings)) + isDSR := features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR for _, externalIP := range externalIPStrings { ip := net.ParseIP(externalIP) if err := p.ofClient.InstallServiceFlows(&agenttypes.ServiceConfig{ @@ -599,18 +602,29 @@ func (p *proxier) installExternalIPService(svcInfoStr string, IsExternal: true, IsNodePort: false, IsNested: false, // Unsupported for ExternalIP - IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR, + IsDSR: isDSR, }); err != nil { return fmt.Errorf("failed to install ExternalIP load balancing flows: %w", err) } if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil { return fmt.Errorf("failed to install ExternalIP traffic redirecting routes: %w", err) } + externalIPs = append(externalIPs, ip) + } + if len(externalIPs) != 0 { + if err := p.routeClient.AddExternalIPConf(externalIPs, svcPort, protocol, isDSR); err != nil { + return err + } } return nil } -func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPStrings []string, svcPort uint16, protocol binding.Protocol) error { +func (p *proxier) uninstallExternalIPService(svcInfoStr string, + externalIPStrings []string, + svcPort uint16, + protocol binding.Protocol, + loadBalancerMode agentconfig.LoadBalancerMode) error { + externalIPs := make([]net.IP, 0, len(externalIPStrings)) for _, externalIP := range externalIPStrings { ip := net.ParseIP(externalIP) if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil { @@ -619,6 +633,13 @@ func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPString if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil { return fmt.Errorf("failed to remove ExternalIP traffic redirecting routes: %w", err) } + externalIPs = append(externalIPs, ip) + } + isDSR := features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR + if len(externalIPs) != 0 { + if err := p.routeClient.DeleteExternalIPConf(externalIPs, svcPort, protocol, isDSR); err != nil { + return err + } } return nil } @@ -632,6 +653,8 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string, trafficPolicyLocal bool, affinityTimeout uint16, loadBalancerMode agentconfig.LoadBalancerMode) error { + loadBalancerIPs := make([]net.IP, 0, len(loadBalancerIPStrings)) + isDSR := features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR for _, ingress := range loadBalancerIPStrings { if ingress != "" { ip := net.ParseIP(ingress) @@ -646,7 +669,7 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string, IsExternal: true, IsNodePort: false, IsNested: false, // Unsupported for LoadBalancerIP - IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR, + IsDSR: isDSR, }); err != nil { return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err) } @@ -654,9 +677,16 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string, if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil { return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) } + loadBalancerIPs = append(loadBalancerIPs, ip) } } } + if p.proxyAll && len(loadBalancerIPs) != 0 { + if err := p.routeClient.AddLoadBalancerConf(loadBalancerIPs, svcPort, protocol, isDSR); err != nil { + return err + } + } + return nil } @@ -677,7 +707,12 @@ func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn return nil } -func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { +func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, + loadBalancerIPStrings []string, + svcPort uint16, + protocol binding.Protocol, + loadBalancerMode agentconfig.LoadBalancerMode) error { + loadBalancerIPs := make([]net.IP, 0, len(loadBalancerIPStrings)) for _, ingress := range loadBalancerIPStrings { if ingress != "" { ip := net.ParseIP(ingress) @@ -688,9 +723,16 @@ func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIP if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil { return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) } + loadBalancerIPs = append(loadBalancerIPs, ip) } } } + if p.proxyAll && len(loadBalancerIPs) != 0 { + isDSR := features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR + if err := p.routeClient.DeleteLoadBalancerConf(loadBalancerIPs, svcPort, protocol, isDSR); err != nil { + return err + } + } return nil } @@ -937,7 +979,7 @@ func (p *proxier) updateServiceExternalAddresses(pSvcInfo, svcInfo *types.Servic } deletedExternalIPs := smallSliceDifference(pSvcInfo.ExternalIPStrings(), svcInfo.ExternalIPStrings()) addedExternalIPs := smallSliceDifference(svcInfo.ExternalIPStrings(), pSvcInfo.ExternalIPStrings()) - if err := p.uninstallExternalIPService(pSvcInfoStr, deletedExternalIPs, pSvcPort, pSvcProto); err != nil { + if err := p.uninstallExternalIPService(pSvcInfoStr, deletedExternalIPs, pSvcPort, pSvcProto, loadBalancerMode); err != nil { klog.ErrorS(err, "Error when uninstalling ExternalIP flows and configurations for Service", "ServiceInfo", pSvcInfoStr) return false } @@ -949,7 +991,7 @@ func (p *proxier) updateServiceExternalAddresses(pSvcInfo, svcInfo *types.Servic if p.proxyLoadBalancerIPs { deletedLoadBalancerIPs := smallSliceDifference(pSvcInfo.LoadBalancerIPStrings(), svcInfo.LoadBalancerIPStrings()) addedLoadBalancerIPs := smallSliceDifference(svcInfo.LoadBalancerIPStrings(), pSvcInfo.LoadBalancerIPStrings()) - if err := p.uninstallLoadBalancerService(pSvcInfoStr, deletedLoadBalancerIPs, pSvcPort, pSvcProto); err != nil { + if err := p.uninstallLoadBalancerService(pSvcInfoStr, deletedLoadBalancerIPs, pSvcPort, pSvcProto, loadBalancerMode); err != nil { klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServiceInfo", pSvcInfoStr) return false } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 485a9695932..7e903f93884 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -541,7 +541,8 @@ func testClusterIPAdd(t *testing.T, } } if externalIP != nil { - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol, false).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -746,10 +747,13 @@ func testLoadBalancerAdd(t *testing.T, } if proxyLoadBalancerIPs { mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol, dsrEnabled).Times(1) } - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol, dsrEnabled).Times(1) + } fp.syncProxyRules() @@ -906,7 +910,7 @@ func testNodePortAdd(t *testing.T, }).Times(1) } } - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) } @@ -1176,7 +1180,8 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { ClusterGroupID: clusterGroupID1, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIPv4}, uint16(port80Int32), binding.ProtocolTCP, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIPv4).Times(1) localGroupID2 := fp.groupCounter.AllocateIfNotExist(svcPortName2, true) @@ -1210,7 +1215,8 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { ClusterGroupID: clusterGroupID2, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIPv4}, uint16(port443Int32), binding.ProtocolTCP, false).Times(1) fp.syncProxyRules() @@ -1223,14 +1229,14 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port80Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port443Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) // The route for the ClusterIP and the LoadBalancer IP should only be uninstalled once. mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIPv4) @@ -1600,7 +1606,7 @@ func testNodePortRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 b IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: externalIP, @@ -1618,7 +1624,7 @@ func testNodePortRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 b mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) @@ -1735,7 +1741,7 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP ClusterGroupID: 2, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) if externalIP != nil { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -1755,7 +1761,7 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) if externalIP != nil { mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) @@ -2031,12 +2037,12 @@ func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net. IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) fp.syncProxyRules() mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort + 1), @@ -2054,7 +2060,7 @@ func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net. IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() } @@ -2137,14 +2143,14 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP ClusterGroupID: 2, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) fp.syncProxyRules() mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), gomock.Any()).Times(1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, @@ -2172,7 +2178,7 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP ClusterGroupID: 2, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() @@ -2321,7 +2327,7 @@ func testLoadBalancerRemoveEndpoints(t *testing.T, nodePortAddresses []net.IP, s IsExternal: true, ClusterGroupID: 1, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) @@ -2549,7 +2555,7 @@ func testServicePortUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2560,8 +2566,8 @@ func testServicePortUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2675,10 +2681,10 @@ func testServiceNodePortUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) s1 := mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) s2 := mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: vIP, ServicePort: uint16(svcNodePort + 1), @@ -2687,7 +2693,7 @@ func testServiceNodePortUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort+1), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort+1), bindingProtocol).Times(1) s2.After(s1) } if svcType == corev1.ServiceTypeLoadBalancer { @@ -2805,7 +2811,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2860,8 +2866,8 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, }).Times(1) s2.After(s1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { s1 := mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) @@ -3064,7 +3070,7 @@ func testServiceIngressIPsUpdate(t *testing.T, IsExternal: true, }).Times(1) } - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) for _, ip := range loadBalancerIPs { mockRouteClient.EXPECT().AddExternalIPRoute(ip).Times(1) } @@ -3181,9 +3187,9 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, IsNodePort: true, AffinityTimeout: uint16(affinitySeconds), }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: vIP, ServicePort: uint16(svcNodePort), @@ -3193,7 +3199,7 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, IsNodePort: true, AffinityTimeout: uint16(updatedAffinitySeconds), }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3320,7 +3326,7 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3332,8 +3338,8 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, IsNodePort: true, AffinityTimeout: uint16(affinitySeconds), }).Times(1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3576,7 +3582,7 @@ func TestGetServiceFlowKeys(t *testing.T) { makeEndpointSliceMap(fp, eps) } if tc.svc != nil && tc.eps != nil && tc.serviceInstalled { - mockRouteClient.EXPECT().AddNodePort(nodePortAddressesIPv4, uint16(svcNodePort), binding.ProtocolTCP).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddressesIPv4, uint16(svcNodePort), binding.ProtocolTCP).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1) diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 5355efbf3c6..316d920428a 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -76,11 +76,23 @@ type Interface interface { // DeleteEgressRule deletes the IP rule installed by AddEgressRule. DeleteEgressRule(tableID uint32, mark uint32) error - // AddNodePort adds configurations when a NodePort Service is created. - AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error + // AddNodePortConf adds configurations when a NodePort Service is created. + AddNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error - // DeleteNodePort deletes related configurations when a NodePort Service is deleted. - DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error + // DeleteNodePortConf deletes related configurations when a NodePort Service is deleted. + DeleteNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error + + // AddLoadBalancerConf adds configurations when a LoadBalancer Service is created. + AddLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error + + // DeleteLoadBalancerConf deletes related configurations when a LoadBalancer Service is deleted. + DeleteLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error + + // AddExternalIPConf adds configurations when an externalIP is created. + AddExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error + + // DeleteExternalIPConf deletes related configurations when an externalIP is deleted. + DeleteExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error // AddExternalIPRoute adds a route entry when an external IP is added. AddExternalIPRoute(externalIP net.IP) error diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 31890415aa5..863d70d6570 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -27,8 +27,10 @@ import ( "github.com/containernetworking/plugins/pkg/ip" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" @@ -55,7 +57,7 @@ const ( // antreaPodIP6Set contains all Per-Node IPAM IPv6 Pod CIDRs of this cluster. antreaPodIP6Set = "ANTREA-POD-IP6" - // Antrea managed ipset. Max name length is 31 chars. + // Antrea managed ipsets. Max name length is 31 chars. // localAntreaFlexibleIPAMPodIPSet contains all AntreaFlexibleIPAM Pod IPs of this Node. localAntreaFlexibleIPAMPodIPSet = "LOCAL-FLEXIBLE-IPAM-POD-IP" // localAntreaFlexibleIPAMPodIP6Set contains all AntreaFlexibleIPAM Pod IPv6s of this Node. @@ -65,9 +67,17 @@ const ( // clusterNodeIP6Set contains all other Node IP6s in the cluster. clusterNodeIP6Set = "CLUSTER-NODE-IP6" - // Antrea proxy NodePort IP - antreaNodePortIPSet = "ANTREA-NODEPORT-IP" - antreaNodePortIP6Set = "ANTREA-NODEPORT-IP6" + // Antrea managed ipsets for proxy. + antreaNodePortIPSet = "ANTREA-NODEPORT-IP" + antreaNodePortIP6Set = "ANTREA-NODEPORT-IP6" + antreaLoadBalancerIPSet = "ANTREA-LOADBALANCER-IP" + antreaLoadBalancerIP6Set = "ANTREA-LOADBALANCER-IP6" + antreaExternalIPIPSet = "ANTREA-EXTERNAL-IP" + antreaExternalIPIP6Set = "ANTREA-EXTERNAL-IP6" + antreaDSRLoadBalancerIPSet = "ANTREA-DSR-LOADBALANCER-IP" + antreaDSRLoadBalancerIP6Set = "ANTREA-DSR-LOADBALANCER-IP6" + antreaDSRExternalIPIPSet = "ANTREA-DSR-EXTERNAL-IP" + antreaDSRExternalIPIP6Set = "ANTREA-DSR-EXTERNAL-IP6" // Antrea managed iptables chains. antreaForwardChain = "ANTREA-FORWARD" @@ -77,6 +87,8 @@ const ( antreaOutputChain = "ANTREA-OUTPUT" antreaMangleChain = "ANTREA-MANGLE" + kubeProxyServiceChain = "KUBE-SERVICES" + serviceIPv4CIDRKey = "serviceIPv4CIDRKey" serviceIPv6CIDRKey = "serviceIPv6CIDRKey" @@ -97,6 +109,19 @@ var ( _, llrCIDR, _ = net.ParseCIDR("fe80::/64") ) +var ( + serviceIP4Sets = sets.NewString(antreaNodePortIPSet, + antreaLoadBalancerIPSet, + antreaExternalIPIPSet, + antreaDSRLoadBalancerIPSet, + antreaDSRExternalIPIPSet) + serviceIP6Sets = sets.NewString(antreaNodePortIP6Set, + antreaLoadBalancerIP6Set, + antreaExternalIPIP6Set, + antreaDSRLoadBalancerIP6Set, + antreaDSRExternalIPIP6Set) +) + // Client takes care of routing container packets in host network, coordinating ip route, ip rule, iptables and ipset. type Client struct { nodeConfig *config.NodeConfig @@ -105,6 +130,7 @@ type Client struct { iptables iptables.Interface ipset ipset.Interface netlink utilnetlink.Interface + k8sClient clientset.Interface // nodeRoutes caches ip routes to remote Pods. It's a map of podCIDR to routes. nodeRoutes sync.Map // nodeNeighbors caches IPv6 Neighbors to remote host gateway @@ -118,14 +144,14 @@ type Client struct { multicastEnabled bool isCloudEKS bool nodeNetworkPolicyEnabled bool + kubeServiceHost string + kubeServicePort intstr.IntOrString // serviceRoutes caches ip routes about Services. serviceRoutes sync.Map // serviceNeighbors caches neighbors. serviceNeighbors sync.Map - // nodePortsIPv4 caches all existing IPv4 NodePorts. - nodePortsIPv4 sync.Map - // nodePortsIPv6 caches all existing IPv6 NodePorts. - nodePortsIPv6 sync.Map + // serviceIPSets caches ipsets about Services. + serviceIPSets map[string]*sync.Map // clusterNodeIPs stores the IPv4 of all other Nodes in the cluster clusterNodeIPs sync.Map // clusterNodeIP6s stores the IPv6 address of all other Nodes in the cluster. It is maintained but not consumed @@ -156,8 +182,9 @@ func NewClient(networkConfig *config.NetworkConfig, connectUplinkToBridge bool, nodeNetworkPolicyEnabled bool, multicastEnabled bool, + isKubeAPIServerOverride bool, serviceCIDRProvider servicecidr.Interface) (*Client, error) { - return &Client{ + c := &Client{ networkConfig: networkConfig, noSNAT: noSNAT, proxyAll: proxyAll, @@ -168,7 +195,28 @@ func NewClient(networkConfig *config.NetworkConfig, netlink: &netlink.Handle{}, isCloudEKS: env.IsCloudEKS(), serviceCIDRProvider: serviceCIDRProvider, - }, nil + } + + var ipsets []string + if networkConfig.IPv4Enabled { + ipsets = append(ipsets, serviceIP4Sets.UnsortedList()...) + } + if networkConfig.IPv6Enabled { + ipsets = append(ipsets, serviceIP6Sets.UnsortedList()...) + } + for _, s := range ipsets { + c.serviceIPSets[s] = &sync.Map{} + } + + if !isKubeAPIServerOverride { + c.kubeServiceHost = env.GetKubeServiceHost() + if env.GetKubeServicePort() == 0 { + return nil, fmt.Errorf("kube Service port is invalid") + } + c.kubeServicePort = intstr.FromInt32(env.GetKubeServicePort()) + } + + return c, nil } // Initialize initializes all infrastructures required to route container packets in host network. @@ -178,7 +226,7 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { c.iptablesInitialized = make(chan struct{}) var err error - // Sets up the ipset that will be used in iptables. + // Sets up the ipsets that will be used in iptables. if err = c.syncIPSet(); err != nil { return fmt.Errorf("failed to initialize ipset: %v", err) } @@ -361,7 +409,7 @@ func (c *Client) syncRoute() error { return nil } -// syncIPSet ensures that the required ipset exists and it has the initial members. +// syncIPSet ensures that the required ipsets exist and they have the initial members. func (c *Client) syncIPSet() error { // In policy-only mode, Node Pod CIDR is undefined. if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { @@ -384,29 +432,21 @@ func (c *Client) syncIPSet() error { } } - // If proxy full is enabled, create NodePort ipset. + // If proxyAll is enabled, create the ipsets for Service. if c.proxyAll { - if err := c.ipset.CreateIPSet(antreaNodePortIPSet, ipset.HashIPPort, false); err != nil { - return err - } - if err := c.ipset.CreateIPSet(antreaNodePortIP6Set, ipset.HashIPPort, true); err != nil { - return err - } - - c.nodePortsIPv4.Range(func(k, _ interface{}) bool { - ipSetEntry := k.(string) - if err := c.ipset.AddEntry(antreaNodePortIPSet, ipSetEntry); err != nil { - return false - } - return true - }) - c.nodePortsIPv6.Range(func(k, _ interface{}) bool { - ipSetEntry := k.(string) - if err := c.ipset.AddEntry(antreaNodePortIP6Set, ipSetEntry); err != nil { - return false + for ipsetName, ipsetEntries := range c.serviceIPSets { + isIPv6 := serviceIP6Sets.Has(ipsetName) + if err := c.ipset.CreateIPSet(ipsetName, ipset.HashIPPort, isIPv6); err != nil { + return err } - return true - }) + ipsetEntries.Range(func(k, _ interface{}) bool { + ipSetEntry := k.(string) + if err := c.ipset.AddEntry(ipsetName, ipSetEntry); err != nil { + return false + } + return true + }) + } } if c.connectUplinkToBridge { @@ -488,6 +528,38 @@ func getNodePortIPSetName(isIPv6 bool) string { } } +func getLoadBalancerIPSetName(isIPv6 bool) string { + if isIPv6 { + return antreaLoadBalancerIP6Set + } else { + return antreaLoadBalancerIPSet + } +} + +func getExternalIPIPSetName(isIPv6 bool) string { + if isIPv6 { + return antreaExternalIPIP6Set + } else { + return antreaExternalIPIPSet + } +} + +func getDSRLoadBalancerIPSetName(isIPv6 bool) string { + if isIPv6 { + return antreaDSRLoadBalancerIP6Set + } else { + return antreaDSRLoadBalancerIPSet + } +} + +func getDSRExternalIPIPSetName(isIPv6 bool) string { + if isIPv6 { + return antreaDSRExternalIPIP6Set + } else { + return antreaDSRExternalIPIPSet + } +} + func getLocalAntreaFlexibleIPAMPodIPSetName(isIPv6 bool) string { if isIPv6 { return localAntreaFlexibleIPAMPodIP6Set @@ -567,32 +639,40 @@ func (c *Client) syncIPTables() error { srcChain string dstChain string comment string + insert bool } jumpRules := []jumpRule{ - {iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules"}, - {iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, - {iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules"}, - {iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules"}, - {iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules"}, // TODO: unify the chain naming style - {iptables.MangleTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, + {iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules", false}, + {iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, + {iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules", false}, + {iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules", false}, + {iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules", false}, // TODO: unify the chain naming style + {iptables.MangleTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, } if c.proxyAll || c.isCloudEKS { - jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules"}) + jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules", c.proxyAll == true}) } if c.proxyAll { - jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}) + jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", true}) } if c.nodeNetworkPolicyEnabled { - jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.InputChain, antreaInputChain, "Antrea: jump to Antrea input rules"}) - jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}) + jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.InputChain, antreaInputChain, "Antrea: jump to Antrea input rules", false}) + jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}) } for _, rule := range jumpRules { if err := c.iptables.EnsureChain(iptables.ProtocolDual, rule.table, rule.dstChain); err != nil { return err } ruleSpec := []string{"-j", rule.dstChain, "-m", "comment", "--comment", rule.comment} - if err := c.iptables.AppendRule(iptables.ProtocolDual, rule.table, rule.srcChain, ruleSpec); err != nil { - return err + if rule.insert { + // TODO: ensure that the chain are always in the first place. + if err := c.iptables.InsertRule(iptables.ProtocolDual, rule.table, rule.srcChain, ruleSpec); err != nil { + return err + } + } else { + if err := c.iptables.AppendRule(iptables.ProtocolDual, rule.table, rule.srcChain, ruleSpec); err != nil { + return err + } } } @@ -624,16 +704,46 @@ func (c *Client) syncIPTables() error { return true }) + var proxyAllIPTablesIPv4, proxyAllIPTablesIPv6 map[string]map[string][]string + if c.proxyAll { + if c.networkConfig.IPv4Enabled { + serviceCIDR, _ := c.serviceRoutes.Load(serviceIPv4CIDRKey) + proxyAllIPTablesIPv4 = c.generateProxyAllRules( + antreaNodePortIPSet, + antreaLoadBalancerIPSet, + antreaExternalIPIPSet, + antreaDSRLoadBalancerIPSet, + antreaDSRExternalIPIPSet, + serviceCIDR.(string), + config.VirtualNodePortDNATIPv4.String(), + config.VirtualServiceIPv4.String(), + false, + ) + } + if c.networkConfig.IPv6Enabled { + serviceCIDR, _ := c.serviceRoutes.Load(serviceIPv6CIDRKey) + proxyAllIPTablesIPv6 = c.generateProxyAllRules( + antreaNodePortIP6Set, + antreaLoadBalancerIP6Set, + antreaExternalIPIP6Set, + antreaDSRLoadBalancerIP6Set, + antreaDSRExternalIPIP6Set, + serviceCIDR.(string), + config.VirtualNodePortDNATIPv6.String(), + config.VirtualServiceIPv6.String(), + true, + ) + } + } + // Use iptables-restore to configure IPv4 settings. if c.networkConfig.IPv4Enabled { iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv4CIDR, antreaPodIPSet, localAntreaFlexibleIPAMPodIPSet, - antreaNodePortIPSet, clusterNodeIPSet, - config.VirtualNodePortDNATIPv4, - config.VirtualServiceIPv4, snatMarkToIPv4, + proxyAllIPTablesIPv4, nodeNetworkPolicyIPTablesIPv4, false) @@ -648,11 +758,9 @@ func (c *Client) syncIPTables() error { iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv6CIDR, antreaPodIP6Set, localAntreaFlexibleIPAMPodIP6Set, - antreaNodePortIP6Set, clusterNodeIP6Set, - config.VirtualNodePortDNATIPv6, - config.VirtualServiceIPv6, snatMarkToIPv6, + proxyAllIPTablesIPv6, nodeNetworkPolicyIPTablesIPv6, true) // Setting --noflush to keep the previous contents (i.e. non antrea managed chains) of the tables. @@ -664,14 +772,135 @@ func (c *Client) syncIPTables() error { return nil } +func (c *Client) generateProxyAllRules(nodePortIPSet string, + loadBalancerIPSet string, + externalIPIPSet string, + dsrLoadBalancerIPSet string, + dsrExternalIPIPSet string, + serviceCIDR string, + nodePortDNATVirtualIP string, + serviceVirtualIP string, + isIPv6 bool) map[string]map[string][]string { + rules := map[string]map[string][]string{ + iptables.NATTable: make(map[string][]string), + iptables.RawTable: make(map[string][]string), + } + var natPreroutingRules, natOutputRules []string + if c.kubeServiceHost != "" { + if isIPv6 && utilnet.IsIPv6String(c.kubeServiceHost) || + !isIPv6 && utilnet.IsIPv4String(c.kubeServiceHost) { + natPreroutingRules = append(natPreroutingRules, + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: bypass external to kube Service traffic when kube Service Endpoint is not override"). + MatchIPDst(c.kubeServiceHost). + MatchPortDst(&c.kubeServicePort, nil). + MatchTransProtocol(iptables.ProtocolTCP). + SetTarget(kubeProxyServiceChain). + Done(). + GetRule()) + natOutputRules = append(natOutputRules, + iptables.NewRuleBuilder(antreaOutputChain). + SetComment("Antrea: bypass local to kube Service traffic when kube Service Endpoint is not override"). + MatchIPDst(c.kubeServiceHost). + MatchPortDst(&c.kubeServicePort, nil). + MatchTransProtocol(iptables.ProtocolTCP). + SetTarget(kubeProxyServiceChain). + Done(). + GetRule()) + } + } + natPreroutingRules = append(natPreroutingRules, + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: accept external to ClusterIP packets"). + MatchCIDRDst(serviceCIDR). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: DNAT external to NodePort packets"). + MatchIPSetDst(nodePortIPSet, ipset.HashIPPort). + SetTarget(iptables.DNATTarget). + SetDNATIP(nodePortDNATVirtualIP). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: accept external to LoadBalancer packets"). + MatchIPSetDst(loadBalancerIPSet, ipset.HashIPPort). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: accept external to externalIP packets"). + MatchIPSetDst(externalIPIPSet, ipset.HashIPPort). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule(), + ) + natOutputRules = append(natOutputRules, + iptables.NewRuleBuilder(antreaOutputChain). + SetComment("Antrea: accept local to ClusterIP packets"). + MatchCIDRDst(serviceCIDR). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaOutputChain). + SetComment("Antrea: DNAT local to NodePort packets"). + MatchIPSetDst(nodePortIPSet, ipset.HashIPPort). + SetTarget(iptables.DNATTarget). + SetDNATIP(nodePortDNATVirtualIP). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaOutputChain). + SetComment("Antrea: accept local to LoadBalancer packets"). + MatchIPSetDst(loadBalancerIPSet, ipset.HashIPPort). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaOutputChain). + SetComment("Antrea: accept local to externalIP packets"). + MatchIPSetDst(externalIPIPSet, ipset.HashIPPort). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule(), + ) + + rawPreroutingRules := []string{ + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: do not track external to DSR LoadBalancer packets"). + MatchIPSetDst(dsrLoadBalancerIPSet, ipset.HashIPPort). + SetTarget(iptables.NotrackTarget). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: do not track external to DSR externalIP packets"). + MatchIPSetDst(dsrExternalIPIPSet, ipset.HashIPPort). + SetTarget(iptables.NotrackTarget). + Done(). + GetRule(), + } + natPostRoutingRules := []string{ + iptables.NewRuleBuilder(antreaPostRoutingChain). + SetComment("Antrea: masquerade OVS virtual source IP"). + MatchIPSrc(serviceVirtualIP). + SetTarget(iptables.MasqueradeTarget). + Done(). + GetRule(), + } + + rules[iptables.NATTable][antreaPreRoutingChain] = natPreroutingRules + rules[iptables.NATTable][antreaOutputChain] = natOutputRules + rules[iptables.NATTable][antreaPostRoutingChain] = natPostRoutingRules + rules[iptables.RawTable][antreaPreRoutingChain] = rawPreroutingRules + + return rules +} + func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet, localAntreaFlexibleIPAMPodIPSet, - nodePortIPSet, clusterNodeIPSet string, - nodePortDNATVirtualIP, - serviceVirtualIP net.IP, snatMarkToIP map[uint32]net.IP, + proxyAllIPTables map[string]map[string][]string, nodeNetWorkPolicyIPTables map[string][]string, isIPv6 bool) *bytes.Buffer { // Create required rules in the antrea chains. @@ -725,6 +954,11 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, }...) } } + if c.proxyAll { + for _, ruleStr := range proxyAllIPTables[iptables.RawTable][antreaPreRoutingChain] { + writeLine(iptablesData, ruleStr) + } + } writeLine(iptablesData, "COMMIT") // Write head lines anyway so the undesired rules can be deleted when noEncap -> encap. @@ -816,21 +1050,13 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, writeLine(iptablesData, iptables.MakeChainLine(antreaPreRoutingChain)) } if c.proxyAll { - writeLine(iptablesData, []string{ - "-A", antreaPreRoutingChain, - "-m", "comment", "--comment", `"Antrea: DNAT external to NodePort packets"`, - "-m", "set", "--match-set", nodePortIPSet, "dst,dst", - "-j", iptables.DNATTarget, - "--to-destination", nodePortDNATVirtualIP.String(), - }...) + for _, ruleStr := range proxyAllIPTables[iptables.NATTable][antreaPreRoutingChain] { + writeLine(iptablesData, ruleStr) + } writeLine(iptablesData, iptables.MakeChainLine(antreaOutputChain)) - writeLine(iptablesData, []string{ - "-A", antreaOutputChain, - "-m", "comment", "--comment", `"Antrea: DNAT local to NodePort packets"`, - "-m", "set", "--match-set", nodePortIPSet, "dst,dst", - "-j", iptables.DNATTarget, - "--to-destination", nodePortDNATVirtualIP.String(), - }...) + for _, ruleStr := range proxyAllIPTables[iptables.NATTable][antreaOutputChain] { + writeLine(iptablesData, ruleStr) + } } writeLine(iptablesData, iptables.MakeChainLine(antreaPostRoutingChain)) // The masqueraded multicast traffic will become unicast so we @@ -884,12 +1110,9 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, // If AntreaProxy full support is enabled, it SNATs the packets whose source IP is VirtualServiceIPv4/VirtualServiceIPv6 // so the packets can be routed back to this Node. if c.proxyAll { - writeLine(iptablesData, []string{ - "-A", antreaPostRoutingChain, - "-m", "comment", "--comment", `"Antrea: masquerade OVS virtual source IP"`, - "-s", serviceVirtualIP.String(), - "-j", iptables.MasqueradeTarget, - }...) + for _, ruleStr := range proxyAllIPTables[iptables.NATTable][antreaPostRoutingChain] { + writeLine(iptablesData, ruleStr) + } } // This generates the rule to masquerade the packets destined for a hostPort whose backend is an AntreaIPAM VLAN Pod. @@ -1608,48 +1831,99 @@ func (c *Client) addVirtualServiceIPRoute(isIPv6 bool) error { return nil } -// AddNodePort is used to add IP,port:protocol entries to target ip set when a NodePort Service is added. An entry is added -// for every NodePort IP. -func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { - isIPv6 := isIPv6Protocol(protocol) +func (c *Client) addServiceIPSetEntries(ips []net.IP, port uint16, protocol binding.Protocol, ipsetName string) error { transProtocol := getTransProtocolStr(protocol) - ipSetName := getNodePortIPSetName(isIPv6) - - for i := range nodePortAddresses { - ipSetEntry := fmt.Sprintf("%s,%s:%d", nodePortAddresses[i], transProtocol, port) - if err := c.ipset.AddEntry(ipSetName, ipSetEntry); err != nil { + for i := range ips { + ipsetEntry := fmt.Sprintf("%s,%s:%d", ips[i].String(), transProtocol, port) + if err := c.ipset.AddEntry(ipsetName, ipsetEntry); err != nil { return err } - if isIPv6 { - c.nodePortsIPv6.Store(ipSetEntry, struct{}{}) - } else { - c.nodePortsIPv4.Store(ipSetEntry, struct{}{}) - } - klog.V(4).InfoS("Added ipset for NodePort", "IP", nodePortAddresses[i], "Port", port, "Protocol", protocol) + c.serviceIPSets[ipsetName].Store(ipsetEntry, struct{}{}) + klog.V(4).InfoS("Added ipset entry for Service", "IP", ips[i], "Port", port, "Protocol", protocol) } return nil } -// DeleteNodePort is used to delete related IP set entries when a NodePort Service is deleted. -func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { - isIPv6 := isIPv6Protocol(protocol) +func (c *Client) deleteServiceIPSetEntries(ips []net.IP, port uint16, protocol binding.Protocol, ipsetName string) error { transProtocol := getTransProtocolStr(protocol) - ipSetName := getNodePortIPSetName(isIPv6) + for i := range ips { + ipSetEntry := fmt.Sprintf("%s,%s:%d", ips[i].String(), transProtocol, port) + if err := c.ipset.DelEntry(ipsetName, ipSetEntry); err != nil { + return err + } + c.serviceIPSets[ipsetName].Delete(ipSetEntry) + } + return nil +} - for i := range nodePortAddresses { - ipSetEntry := fmt.Sprintf("%s,%s:%d", nodePortAddresses[i], transProtocol, port) - if err := c.ipset.DelEntry(ipSetName, ipSetEntry); err != nil { +// AddNodePortConf is used to add IP,protocol:port entries to target ipset when a NodePort Service is added. An entry is +// added for every NodePort IP. +func (c *Client) AddNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error { + isIPv6 := isIPv6Protocol(protocol) + ipsetName := getNodePortIPSetName(isIPv6) + return c.addServiceIPSetEntries(nodePortIPs, port, protocol, ipsetName) +} + +// DeleteNodePortConf is used to delete related ipset entries when a NodePort Service is deleted. +func (c *Client) DeleteNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error { + isIPv6 := isIPv6Protocol(protocol) + ipsetName := getNodePortIPSetName(isIPv6) + return c.deleteServiceIPSetEntries(nodePortIPs, port, protocol, ipsetName) +} + +// AddLoadBalancerConf is used to add IP,protocol:port entries to target ipset when a LoadBalancer Service is added. An +// entry is added for every LoadBalancer IP. +func (c *Client) AddLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error { + isIPv6 := isIPv6Protocol(protocol) + if isDSR { + ipsetName := getDSRLoadBalancerIPSetName(isIPv6) + if err := c.addServiceIPSetEntries(loadBalancerIPs, port, protocol, ipsetName); err != nil { return err } - if isIPv6 { - c.nodePortsIPv6.Delete(ipSetEntry) - } else { - c.nodePortsIPv4.Delete(ipSetEntry) + } + ipsetName := getLoadBalancerIPSetName(isIPv6) + return c.addServiceIPSetEntries(loadBalancerIPs, port, protocol, ipsetName) +} + +// DeleteLoadBalancerConf is used to delete related ipset entries when a LoadBalancer Service is deleted. +func (c *Client) DeleteLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error { + isIPv6 := isIPv6Protocol(protocol) + if isDSR { + ipsetName := getDSRLoadBalancerIPSetName(isIPv6) + if err := c.deleteServiceIPSetEntries(loadBalancerIPs, port, protocol, ipsetName); err != nil { + return err } } + ipsetName := getLoadBalancerIPSetName(isIPv6) + return c.deleteServiceIPSetEntries(loadBalancerIPs, port, protocol, ipsetName) +} - return nil +// AddExternalIPConf is used to add IP,protocol:port entries to target ipset when a Service with externalIPs is added. +// An entry is added for every externalIP. +func (c *Client) AddExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error { + isIPv6 := isIPv6Protocol(protocol) + if isDSR { + ipsetName := getDSRExternalIPIPSetName(isIPv6) + if err := c.addServiceIPSetEntries(externalIPs, port, protocol, ipsetName); err != nil { + return err + } + } + ipsetName := getExternalIPIPSetName(isIPv6) + return c.addServiceIPSetEntries(externalIPs, port, protocol, ipsetName) +} + +// DeleteExternalIPConf is used to delete related ipset entries when a Service with externalIPs is deleted. +func (c *Client) DeleteExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error { + isIPv6 := isIPv6Protocol(protocol) + if isDSR { + ipsetName := getDSRExternalIPIPSetName(isIPv6) + if err := c.deleteServiceIPSetEntries(externalIPs, port, protocol, ipsetName); err != nil { + return err + } + } + ipsetName := getExternalIPIPSetName(isIPv6) + return c.deleteServiceIPSetEntries(externalIPs, port, protocol, ipsetName) } func (c *Client) addServiceCIDRRoute(serviceCIDR *net.IPNet) error { @@ -1817,7 +2091,7 @@ func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error return nil } -// DeletLocaleAntreaFlexibleIPAMPodRule is used to delete related IP set entries when an AntreaFlexibleIPAM Pod is deleted. +// DeleteLocalAntreaFlexibleIPAMPodRule is used to delete related IP set entries when an AntreaFlexibleIPAM Pod is deleted. func (c *Client) DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { if !c.connectUplinkToBridge { return nil diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index ab266186509..371213ad37c 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/vishvananda/netlink" "go.uber.org/mock/gomock" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "antrea.io/antrea/pkg/agent/config" @@ -155,8 +156,7 @@ func TestSyncIPSet(t *testing.T) { nodeNetworkPolicyEnabled bool networkConfig *config.NetworkConfig nodeConfig *config.NodeConfig - nodePortsIPv4 []string - nodePortsIPv6 []string + serviceIPSets map[string][]string clusterNodeIPs map[string]string clusterNodeIP6s map[string]string nodeNetworkPolicyIPSetsIPv4 map[string]sets.Set[string] @@ -202,8 +202,18 @@ func TestSyncIPSet(t *testing.T) { PodIPv4CIDR: podCIDR, PodIPv6CIDR: podCIDRv6, }, - nodePortsIPv4: []string{"192.168.0.2,tcp:10000", "127.0.0.1,tcp:10000"}, - nodePortsIPv6: []string{"fe80::e643:4bff:fe44:ee,tcp:10000", "::1,tcp:10000"}, + serviceIPSets: map[string][]string{ + antreaNodePortIPSet: {"192.168.0.2,tcp:10000", "127.0.0.1,tcp:10000"}, + antreaNodePortIP6Set: {"fe80::e643:4bff:fe44:ee,tcp:10000", "::1,tcp:10000"}, + antreaLoadBalancerIPSet: {"192.168.0.150,tcp:80", "192.168.0.151,tcp:443"}, + antreaLoadBalancerIP6Set: {"2001::192:168:0:150,tcp:80", "2001::192:168:0:151,tcp:443"}, + antreaDSRLoadBalancerIPSet: {"192.168.0.150,tcp:80"}, + antreaDSRLoadBalancerIP6Set: {"2001::192:168:0:150,tcp:80"}, + antreaExternalIPIPSet: {"192.168.0.200,tcp:80", "192.168.0.201,tcp:443"}, + antreaExternalIPIP6Set: {"2001::192:168:0:200,tcp:80", "2001::192:168:0:201,tcp:443"}, + antreaDSRExternalIPIPSet: {"192.168.0.200,tcp:80"}, + antreaDSRExternalIPIP6Set: {"2001::192:168:0:200,tcp:80"}, + }, clusterNodeIPs: map[string]string{"172.16.3.0/24": "192.168.0.3", "172.16.4.0/24": "192.168.0.4"}, clusterNodeIP6s: map[string]string{"2001:ab03:cd04:5503::/64": "fe80::e643:4bff:fe03", "2001:ab03:cd04:5504::/64": "fe80::e643:4bff:fe04"}, nodeNetworkPolicyIPSetsIPv4: map[string]sets.Set[string]{"ANTREA-POL-RULE1-4": sets.New[string]("1.1.1.1/32", "2.2.2.2/32")}, @@ -213,18 +223,45 @@ func TestSyncIPSet(t *testing.T) { mockIPSet.CreateIPSet(antreaPodIP6Set, ipset.HashNet, true) mockIPSet.AddEntry(antreaPodIPSet, podCIDRStr) mockIPSet.AddEntry(antreaPodIP6Set, podCIDRv6Str) + mockIPSet.CreateIPSet(antreaNodePortIPSet, ipset.HashIPPort, false) mockIPSet.CreateIPSet(antreaNodePortIP6Set, ipset.HashIPPort, true) mockIPSet.AddEntry(antreaNodePortIPSet, "192.168.0.2,tcp:10000") mockIPSet.AddEntry(antreaNodePortIPSet, "127.0.0.1,tcp:10000") mockIPSet.AddEntry(antreaNodePortIP6Set, "fe80::e643:4bff:fe44:ee,tcp:10000") mockIPSet.AddEntry(antreaNodePortIP6Set, "::1,tcp:10000") + + mockIPSet.CreateIPSet(antreaLoadBalancerIPSet, ipset.HashIPPort, false) + mockIPSet.CreateIPSet(antreaLoadBalancerIP6Set, ipset.HashIPPort, true) + mockIPSet.AddEntry(antreaLoadBalancerIPSet, "192.168.0.150,tcp:80") + mockIPSet.AddEntry(antreaLoadBalancerIPSet, "192.168.0.151,tcp:443") + mockIPSet.AddEntry(antreaLoadBalancerIP6Set, "2001::192:168:0:150,tcp:80") + mockIPSet.AddEntry(antreaLoadBalancerIP6Set, "2001::192:168:0:151,tcp:443") + + mockIPSet.CreateIPSet(antreaDSRLoadBalancerIPSet, ipset.HashIPPort, false) + mockIPSet.CreateIPSet(antreaDSRLoadBalancerIP6Set, ipset.HashIPPort, true) + mockIPSet.AddEntry(antreaDSRLoadBalancerIPSet, "192.168.0.150,tcp:80") + mockIPSet.AddEntry(antreaDSRLoadBalancerIP6Set, "2001::192:168:0:150,tcp:80") + + mockIPSet.CreateIPSet(antreaExternalIPIPSet, ipset.HashIPPort, false) + mockIPSet.CreateIPSet(antreaExternalIPIP6Set, ipset.HashIPPort, true) + mockIPSet.AddEntry(antreaExternalIPIPSet, "192.168.0.200,tcp:80") + mockIPSet.AddEntry(antreaExternalIPIPSet, "192.168.0.201,tcp:443") + mockIPSet.AddEntry(antreaExternalIPIP6Set, "2001::192:168:0:200,tcp:80") + mockIPSet.AddEntry(antreaExternalIPIP6Set, "2001::192:168:0:201,tcp:443") + + mockIPSet.CreateIPSet(antreaDSRExternalIPIPSet, ipset.HashIPPort, false) + mockIPSet.CreateIPSet(antreaDSRExternalIPIP6Set, ipset.HashIPPort, true) + mockIPSet.AddEntry(antreaDSRExternalIPIPSet, "192.168.0.200,tcp:80") + mockIPSet.AddEntry(antreaDSRExternalIPIP6Set, "2001::192:168:0:200,tcp:80") + mockIPSet.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false) mockIPSet.CreateIPSet(clusterNodeIP6Set, ipset.HashIP, true) mockIPSet.AddEntry(clusterNodeIPSet, "192.168.0.3") mockIPSet.AddEntry(clusterNodeIPSet, "192.168.0.4") mockIPSet.AddEntry(clusterNodeIP6Set, "fe80::e643:4bff:fe03") mockIPSet.AddEntry(clusterNodeIP6Set, "fe80::e643:4bff:fe04") + mockIPSet.CreateIPSet("ANTREA-POL-RULE1-4", ipset.HashNet, false) mockIPSet.CreateIPSet("ANTREA-POL-RULE1-6", ipset.HashNet, true) mockIPSet.AddEntry("ANTREA-POL-RULE1-4", "1.1.1.1/32") @@ -266,16 +303,15 @@ func TestSyncIPSet(t *testing.T) { multicastEnabled: tt.multicastEnabled, connectUplinkToBridge: tt.connectUplinkToBridge, nodeNetworkPolicyEnabled: tt.nodeNetworkPolicyEnabled, - nodePortsIPv4: sync.Map{}, - nodePortsIPv6: sync.Map{}, clusterNodeIPs: sync.Map{}, clusterNodeIP6s: sync.Map{}, + serviceIPSets: make(map[string]*sync.Map), } - for _, nodePortIPv4 := range tt.nodePortsIPv4 { - c.nodePortsIPv4.Store(nodePortIPv4, struct{}{}) - } - for _, nodePortIPv6 := range tt.nodePortsIPv6 { - c.nodePortsIPv6.Store(nodePortIPv6, struct{}{}) + for ipsetName, ipsetEntries := range tt.serviceIPSets { + c.serviceIPSets[ipsetName] = &sync.Map{} + for _, entry := range ipsetEntries { + c.serviceIPSets[ipsetName].Store(entry, struct{}{}) + } } for cidr, nodeIP := range tt.clusterNodeIPs { c.clusterNodeIPs.Store(cidr, nodeIP) @@ -305,8 +341,6 @@ func TestSyncIPTables(t *testing.T) { nodeNetworkPolicyEnabled bool networkConfig *config.NetworkConfig nodeConfig *config.NodeConfig - nodePortsIPv4 []string - nodePortsIPv6 []string markToSNATIP map[uint32]string expectedCalls func(iptables *iptablestest.MockInterfaceMockRecorder) }{ @@ -346,9 +380,9 @@ func TestSyncIPTables(t *testing.T) { mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.MangleTable, antreaOutputChain) mockIPTables.AppendRule(iptables.ProtocolDual, iptables.MangleTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.NATTable, antreaPreRoutingChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.NATTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) + mockIPTables.InsertRule(iptables.ProtocolDual, iptables.NATTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.NATTable, antreaOutputChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.NATTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) + mockIPTables.InsertRule(iptables.ProtocolDual, iptables.NATTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.FilterTable, antreaInputChain) mockIPTables.AppendRule(iptables.ProtocolDual, iptables.FilterTable, iptables.InputChain, []string{"-j", antreaInputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea input rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.FilterTable, antreaOutputChain) @@ -359,6 +393,8 @@ func TestSyncIPTables(t *testing.T) { -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK -A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK -A ANTREA-PREROUTING -m comment --comment "Antrea: drop Pod multicast traffic forwarded via underlay network" -m set --match-set CLUSTER-NODE-IP src -d 224.0.0.0/4 -j DROP +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to DSR LoadBalancer packets" -m set --match-set ANTREA-DSR-LOADBALANCER-IP dst,dst -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to DSR externalIP packets" -m set --match-set ANTREA-DSR-EXTERNAL-IP dst,dst -j NOTRACK COMMIT *mangle :ANTREA-MANGLE - [0:0] @@ -387,9 +423,17 @@ COMMIT COMMIT *nat :ANTREA-PREROUTING - [0:0] +-A ANTREA-PREROUTING -m comment --comment "Antrea: bypass external to kube Service traffic when kube Service Endpoint is not override" -d 10.96.0.1 --dport 443 -p tcp -j KUBE-SERVICES +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to ClusterIP packets" -d 10.96.0.0/12 -j ACCEPT -A ANTREA-PREROUTING -m comment --comment "Antrea: DNAT external to NodePort packets" -m set --match-set ANTREA-NODEPORT-IP dst,dst -j DNAT --to-destination 169.254.0.252 +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to LoadBalancer packets" -m set --match-set ANTREA-LOADBALANCER-IP dst,dst -j ACCEPT +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to externalIP packets" -m set --match-set ANTREA-EXTERNAL-IP dst,dst -j ACCEPT :ANTREA-OUTPUT - [0:0] +-A ANTREA-OUTPUT -m comment --comment "Antrea: bypass local to kube Service traffic when kube Service Endpoint is not override" -d 10.96.0.1 --dport 443 -p tcp -j KUBE-SERVICES +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to ClusterIP packets" -d 10.96.0.0/12 -j ACCEPT -A ANTREA-OUTPUT -m comment --comment "Antrea: DNAT local to NodePort packets" -m set --match-set ANTREA-NODEPORT-IP dst,dst -j DNAT --to-destination 169.254.0.252 +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to LoadBalancer packets" -m set --match-set ANTREA-LOADBALANCER-IP dst,dst -j ACCEPT +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to externalIP packets" -m set --match-set ANTREA-EXTERNAL-IP dst,dst -j ACCEPT :ANTREA-POSTROUTING - [0:0] -A ANTREA-POSTROUTING -m comment --comment "Antrea: SNAT Pod to external packets" ! -o antrea-gw0 -m mark --mark 0x00000001/0x000000ff -j SNAT --to 1.1.1.1 -A ANTREA-POSTROUTING -m comment --comment "Antrea: masquerade Pod to external packets" -s 172.16.10.0/24 -m set ! --match-set ANTREA-POD-IP dst ! -o antrea-gw0 -j MASQUERADE @@ -402,6 +446,8 @@ COMMIT :ANTREA-OUTPUT - [0:0] -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK -A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to DSR LoadBalancer packets" -m set --match-set ANTREA-DSR-LOADBALANCER-IP6 dst,dst -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to DSR externalIP packets" -m set --match-set ANTREA-DSR-EXTERNAL-IP6 dst,dst -j NOTRACK COMMIT *mangle :ANTREA-MANGLE - [0:0] @@ -430,9 +476,15 @@ COMMIT COMMIT *nat :ANTREA-PREROUTING - [0:0] +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to ClusterIP packets" -d fec0::/64 -j ACCEPT -A ANTREA-PREROUTING -m comment --comment "Antrea: DNAT external to NodePort packets" -m set --match-set ANTREA-NODEPORT-IP6 dst,dst -j DNAT --to-destination fc01::aabb:ccdd:eefe +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to LoadBalancer packets" -m set --match-set ANTREA-LOADBALANCER-IP6 dst,dst -j ACCEPT +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to externalIP packets" -m set --match-set ANTREA-EXTERNAL-IP6 dst,dst -j ACCEPT :ANTREA-OUTPUT - [0:0] +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to ClusterIP packets" -d fec0::/64 -j ACCEPT -A ANTREA-OUTPUT -m comment --comment "Antrea: DNAT local to NodePort packets" -m set --match-set ANTREA-NODEPORT-IP6 dst,dst -j DNAT --to-destination fc01::aabb:ccdd:eefe +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to LoadBalancer packets" -m set --match-set ANTREA-LOADBALANCER-IP6 dst,dst -j ACCEPT +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to externalIP packets" -m set --match-set ANTREA-EXTERNAL-IP6 dst,dst -j ACCEPT :ANTREA-POSTROUTING - [0:0] -A ANTREA-POSTROUTING -m comment --comment "Antrea: SNAT Pod to external packets" ! -o antrea-gw0 -m mark --mark 0x00000002/0x000000ff -j SNAT --to fe80::e643:4bff:fe02 -A ANTREA-POSTROUTING -m comment --comment "Antrea: masquerade Pod to external packets" -s 2001:ab03:cd04:55ef::/64 -m set ! --match-set ANTREA-POD-IP6 dst ! -o antrea-gw0 -j MASQUERADE @@ -589,7 +641,14 @@ COMMIT multicastEnabled: tt.multicastEnabled, connectUplinkToBridge: tt.connectUplinkToBridge, nodeNetworkPolicyEnabled: tt.nodeNetworkPolicyEnabled, - deterministic: true, + + deterministic: true, + } + if tt.proxyAll { + c.serviceRoutes.Store(serviceIPv4CIDRKey, "10.96.0.0/12") + c.serviceRoutes.Store(serviceIPv6CIDRKey, "fec0::/64") + c.kubeServiceHost = "10.96.0.1" + c.kubeServicePort = intstr.FromInt32(int32(443)) } for mark, snatIP := range tt.markToSNATIP { c.markToSNATIP.Store(mark, net.ParseIP(snatIP)) @@ -1358,7 +1417,7 @@ func TestAddNodePort(t *testing.T) { ipset := ipsettest.NewMockInterface(ctrl) c := &Client{ipset: ipset} tt.expectedCalls(ipset.EXPECT()) - assert.NoError(t, c.AddNodePort(tt.nodePortAddresses, tt.port, tt.protocol)) + assert.NoError(t, c.AddNodePortConf(tt.nodePortAddresses, tt.port, tt.protocol)) }) } } @@ -1404,7 +1463,7 @@ func TestDeleteNodePort(t *testing.T) { ipset := ipsettest.NewMockInterface(ctrl) c := &Client{ipset: ipset} tt.expectedCalls(ipset.EXPECT()) - assert.NoError(t, c.DeleteNodePort(tt.nodePortAddresses, tt.port, tt.protocol)) + assert.NoError(t, c.DeleteNodePortConf(tt.nodePortAddresses, tt.port, tt.protocol)) }) } } diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 9c279a9c65c..3738ab85c72 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -475,8 +475,8 @@ func (c *Client) DeleteSNATRule(mark uint32) error { return nil } -// TODO: nodePortAddresses is not supported currently. -func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { +// TODO: nodePortIPs is not supported currently. +func (c *Client) AddNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error { netNatStaticMapping := &util.NetNatStaticMapping{ Name: antreaNatNodePort, ExternalIP: net.ParseIP("0.0.0.0"), @@ -493,7 +493,7 @@ func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol b return nil } -func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { +func (c *Client) DeleteNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error { key := fmt.Sprintf("%d-%s", port, protocol) obj, found := c.netNatStaticMappings.Load(key) if !found { @@ -509,6 +509,22 @@ func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protoco return nil } +func (c *Client) AddLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error { + return nil +} + +func (c *Client) DeleteLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error { + return nil +} + +func (c *Client) AddExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error { + return nil +} + +func (c *Client) DeleteExternalIP(externalIPs []net.IP, port uint16, protocol binding.Protocol, isDSR bool) error { + return nil +} + // AddExternalIPRoute adds a route entry that forwards traffic destined for the external IP to the Antrea gateway interface. func (c *Client) AddExternalIPRoute(externalIP net.IP) error { externalIPStr := externalIP.String() diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 6153d2f3d0c..806d24c9349 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -1,4 +1,4 @@ -// Copyright 2023 Antrea Authors +// Copyright 2024 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -84,6 +84,20 @@ func (mr *MockInterfaceMockRecorder) AddEgressRule(arg0, arg1 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddEgressRule", reflect.TypeOf((*MockInterface)(nil).AddEgressRule), arg0, arg1) } +// AddExternalIPConf mocks base method. +func (m *MockInterface) AddExternalIPConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol, arg3 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddExternalIPConf", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddExternalIPConf indicates an expected call of AddExternalIPConf. +func (mr *MockInterfaceMockRecorder) AddExternalIPConf(arg0, arg1, arg2, arg3 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddExternalIPConf", reflect.TypeOf((*MockInterface)(nil).AddExternalIPConf), arg0, arg1, arg2, arg3) +} + // AddExternalIPRoute mocks base method. func (m *MockInterface) AddExternalIPRoute(arg0 net.IP) error { m.ctrl.T.Helper() @@ -98,6 +112,20 @@ func (mr *MockInterfaceMockRecorder) AddExternalIPRoute(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddExternalIPRoute", reflect.TypeOf((*MockInterface)(nil).AddExternalIPRoute), arg0) } +// AddLoadBalancerConf mocks base method. +func (m *MockInterface) AddLoadBalancerConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol, arg3 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddLoadBalancerConf", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddLoadBalancerConf indicates an expected call of AddLoadBalancerConf. +func (mr *MockInterfaceMockRecorder) AddLoadBalancerConf(arg0, arg1, arg2, arg3 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLoadBalancerConf", reflect.TypeOf((*MockInterface)(nil).AddLoadBalancerConf), arg0, arg1, arg2, arg3) +} + // AddLocalAntreaFlexibleIPAMPodRule mocks base method. func (m *MockInterface) AddLocalAntreaFlexibleIPAMPodRule(arg0 []net.IP) error { m.ctrl.T.Helper() @@ -112,18 +140,18 @@ func (mr *MockInterfaceMockRecorder) AddLocalAntreaFlexibleIPAMPodRule(arg0 any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLocalAntreaFlexibleIPAMPodRule", reflect.TypeOf((*MockInterface)(nil).AddLocalAntreaFlexibleIPAMPodRule), arg0) } -// AddNodePort mocks base method. -func (m *MockInterface) AddNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { +// AddNodePortConf mocks base method. +func (m *MockInterface) AddNodePortConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddNodePort", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "AddNodePortConf", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } -// AddNodePort indicates an expected call of AddNodePort. -func (mr *MockInterfaceMockRecorder) AddNodePort(arg0, arg1, arg2 any) *gomock.Call { +// AddNodePortConf indicates an expected call of AddNodePortConf. +func (mr *MockInterfaceMockRecorder) AddNodePortConf(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePort", reflect.TypeOf((*MockInterface)(nil).AddNodePort), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePortConf", reflect.TypeOf((*MockInterface)(nil).AddNodePortConf), arg0, arg1, arg2) } // AddOrUpdateNodeNetworkPolicyIPSet mocks base method. @@ -238,6 +266,20 @@ func (mr *MockInterfaceMockRecorder) DeleteEgressRule(arg0, arg1 any) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteEgressRule", reflect.TypeOf((*MockInterface)(nil).DeleteEgressRule), arg0, arg1) } +// DeleteExternalIPConf mocks base method. +func (m *MockInterface) DeleteExternalIPConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol, arg3 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteExternalIPConf", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteExternalIPConf indicates an expected call of DeleteExternalIPConf. +func (mr *MockInterfaceMockRecorder) DeleteExternalIPConf(arg0, arg1, arg2, arg3 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteExternalIPConf", reflect.TypeOf((*MockInterface)(nil).DeleteExternalIPConf), arg0, arg1, arg2, arg3) +} + // DeleteExternalIPRoute mocks base method. func (m *MockInterface) DeleteExternalIPRoute(arg0 net.IP) error { m.ctrl.T.Helper() @@ -252,6 +294,20 @@ func (mr *MockInterfaceMockRecorder) DeleteExternalIPRoute(arg0 any) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteExternalIPRoute", reflect.TypeOf((*MockInterface)(nil).DeleteExternalIPRoute), arg0) } +// DeleteLoadBalancerConf mocks base method. +func (m *MockInterface) DeleteLoadBalancerConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol, arg3 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteLoadBalancerConf", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteLoadBalancerConf indicates an expected call of DeleteLoadBalancerConf. +func (mr *MockInterfaceMockRecorder) DeleteLoadBalancerConf(arg0, arg1, arg2, arg3 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteLoadBalancerConf", reflect.TypeOf((*MockInterface)(nil).DeleteLoadBalancerConf), arg0, arg1, arg2, arg3) +} + // DeleteLocalAntreaFlexibleIPAMPodRule mocks base method. func (m *MockInterface) DeleteLocalAntreaFlexibleIPAMPodRule(arg0 []net.IP) error { m.ctrl.T.Helper() @@ -294,18 +350,18 @@ func (mr *MockInterfaceMockRecorder) DeleteNodeNetworkPolicyIPTables(arg0, arg1 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodeNetworkPolicyIPTables", reflect.TypeOf((*MockInterface)(nil).DeleteNodeNetworkPolicyIPTables), arg0, arg1) } -// DeleteNodePort mocks base method. -func (m *MockInterface) DeleteNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { +// DeleteNodePortConf mocks base method. +func (m *MockInterface) DeleteNodePortConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteNodePort", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "DeleteNodePortConf", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } -// DeleteNodePort indicates an expected call of DeleteNodePort. -func (mr *MockInterfaceMockRecorder) DeleteNodePort(arg0, arg1, arg2 any) *gomock.Call { +// DeleteNodePortConf indicates an expected call of DeleteNodePortConf. +func (mr *MockInterfaceMockRecorder) DeleteNodePortConf(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodePort", reflect.TypeOf((*MockInterface)(nil).DeleteNodePort), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodePortConf", reflect.TypeOf((*MockInterface)(nil).DeleteNodePortConf), arg0, arg1, arg2) } // DeleteRouteForLink mocks base method. diff --git a/pkg/agent/util/iptables/builder.go b/pkg/agent/util/iptables/builder.go index c0dcad4dd7e..d8c7dc3363e 100644 --- a/pkg/agent/util/iptables/builder.go +++ b/pkg/agent/util/iptables/builder.go @@ -23,6 +23,8 @@ import ( "strings" "k8s.io/apimachinery/pkg/util/intstr" + + "antrea.io/antrea/pkg/agent/util/ipset" ) type iptablesRule struct { @@ -67,20 +69,56 @@ func (b *iptablesRuleBuilder) MatchCIDRDst(cidr string) IPTablesRuleBuilder { return b } -func (b *iptablesRuleBuilder) MatchIPSetSrc(ipset string) IPTablesRuleBuilder { - if ipset == "" { +func (b *iptablesRuleBuilder) MatchIPSrc(ip string) IPTablesRuleBuilder { + if ip == "" || ip == "0.0.0.0" || ip == "::" { return b } - matchStr := fmt.Sprintf("-m set --match-set %s src", ipset) + matchStr := fmt.Sprintf("-s %s", ip) b.writeSpec(matchStr) return b } -func (b *iptablesRuleBuilder) MatchIPSetDst(ipset string) IPTablesRuleBuilder { - if ipset == "" { +func (b *iptablesRuleBuilder) MatchIPDst(ip string) IPTablesRuleBuilder { + if ip == "" || ip == "0.0.0.0" || ip == "::" { return b } - matchStr := fmt.Sprintf("-m set --match-set %s dst", ipset) + matchStr := fmt.Sprintf("-d %s", ip) + b.writeSpec(matchStr) + return b +} + +func (b *iptablesRuleBuilder) MatchIPSetSrc(ipsetName string, ipsetType ipset.SetType) IPTablesRuleBuilder { + if ipsetName == "" { + return b + } + var typeStr string + switch ipsetType { + case ipset.HashNet: + fallthrough + case ipset.HashIP: + typeStr = "src" + case ipset.HashIPPort: + typeStr = "src,src" + } + matchStr := fmt.Sprintf("-m set --match-set %s %s", ipsetName, typeStr) + b.writeSpec(matchStr) + return b +} + +func (b *iptablesRuleBuilder) MatchIPSetDst(ipsetName string, ipsetType ipset.SetType) IPTablesRuleBuilder { + if ipsetName == "" { + return b + } + var typeStr string + switch ipsetType { + case ipset.HashNet: + fallthrough + case ipset.HashIP: + typeStr = "dst" + case ipset.HashIPPort: + typeStr = "dst,dst" + } + matchStr := fmt.Sprintf("-m set --match-set %s %s", ipsetName, typeStr) b.writeSpec(matchStr) return b } @@ -94,7 +132,7 @@ func (b *iptablesRuleBuilder) MatchTransProtocol(protocol string) IPTablesRuleBu return b } -func (b *iptablesRuleBuilder) MatchDstPort(port *intstr.IntOrString, endPort *int32) IPTablesRuleBuilder { +func (b *iptablesRuleBuilder) MatchPortDst(port *intstr.IntOrString, endPort *int32) IPTablesRuleBuilder { if port == nil { return b } @@ -108,7 +146,7 @@ func (b *iptablesRuleBuilder) MatchDstPort(port *intstr.IntOrString, endPort *in return b } -func (b *iptablesRuleBuilder) MatchSrcPort(port, endPort *int32) IPTablesRuleBuilder { +func (b *iptablesRuleBuilder) MatchPortSrc(port, endPort *int32) IPTablesRuleBuilder { if port == nil { return b } @@ -178,6 +216,15 @@ func (b *iptablesRuleBuilder) SetTarget(target string) IPTablesRuleBuilder { return b } +func (b *iptablesRuleBuilder) SetDNATIP(ip string) IPTablesRuleBuilder { + if ip == "" { + return b + } + specStr := fmt.Sprintf("--to-destination %s", ip) + b.writeSpec(specStr) + return b +} + func (b *iptablesRuleBuilder) SetComment(comment string) IPTablesRuleBuilder { if comment == "" { return b diff --git a/pkg/agent/util/iptables/builder_test.go b/pkg/agent/util/iptables/builder_test.go index c3da571a9a9..58f6f98aa79 100644 --- a/pkg/agent/util/iptables/builder_test.go +++ b/pkg/agent/util/iptables/builder_test.go @@ -22,6 +22,8 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/intstr" + + "antrea.io/antrea/pkg/agent/util/ipset" ) var ( @@ -50,11 +52,11 @@ func TestBuilders(t *testing.T) { name: "Accept TCP destination 8080 in FORWARD", chain: ForwardChain, buildFunc: func(builder IPTablesRuleBuilder) IPTablesRule { - return builder.MatchIPSetSrc(ipsetAlfa). - MatchIPSetDst(ipsetBravo). + return builder.MatchIPSetSrc(ipsetAlfa, ipset.HashIP). + MatchIPSetDst(ipsetBravo, ipset.HashIP). MatchInputInterface(eth0). MatchTransProtocol(ProtocolTCP). - MatchDstPort(port8080, nil). + MatchPortDst(port8080, nil). MatchCIDRSrc(cidr). SetComment("Accept TCP 8080"). SetTarget(AcceptTarget). @@ -66,10 +68,10 @@ func TestBuilders(t *testing.T) { name: "Drop UDP destination 137-139 in INPUT", chain: "INPUT", buildFunc: func(builder IPTablesRuleBuilder) IPTablesRule { - return builder.MatchIPSetSrc(ipsetAlfa). + return builder.MatchIPSetSrc(ipsetAlfa, ipset.HashIP). MatchInputInterface(eth0). MatchTransProtocol(ProtocolUDP). - MatchDstPort(port137, &port139). + MatchPortDst(port137, &port139). MatchCIDRDst(cidr). SetComment("Drop UDP 137-139"). SetTarget(DropTarget). @@ -83,7 +85,7 @@ func TestBuilders(t *testing.T) { buildFunc: func(builder IPTablesRuleBuilder) IPTablesRule { return builder.MatchOutputInterface(eth1). MatchTransProtocol(ProtocolSCTP). - MatchSrcPort(&port40000, &port50000). + MatchPortSrc(&port40000, &port50000). SetComment("Drop SCTP 40000-50000"). SetTarget(DropTarget). Done() diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index d436f80afbd..62c45c3a995 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -28,6 +28,8 @@ import ( "github.com/coreos/go-iptables/iptables" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/util/ipset" ) const ( @@ -46,6 +48,7 @@ const ( SNATTarget = "SNAT" DNATTarget = "DNAT" RejectTarget = "REJECT" + NotrackTarget = "NOTRACK" PreRoutingChain = "PREROUTING" InputChain = "INPUT" @@ -109,16 +112,19 @@ type Interface interface { type IPTablesRuleBuilder interface { MatchCIDRSrc(cidr string) IPTablesRuleBuilder MatchCIDRDst(cidr string) IPTablesRuleBuilder - MatchIPSetSrc(ipset string) IPTablesRuleBuilder - MatchIPSetDst(ipset string) IPTablesRuleBuilder + MatchIPSrc(ip string) IPTablesRuleBuilder + MatchIPDst(ip string) IPTablesRuleBuilder + MatchIPSetSrc(ipset string, ipsetType ipset.SetType) IPTablesRuleBuilder + MatchIPSetDst(ipset string, ipsetType ipset.SetType) IPTablesRuleBuilder MatchTransProtocol(protocol string) IPTablesRuleBuilder - MatchDstPort(port *intstr.IntOrString, endPort *int32) IPTablesRuleBuilder - MatchSrcPort(port, endPort *int32) IPTablesRuleBuilder + MatchPortDst(port *intstr.IntOrString, endPort *int32) IPTablesRuleBuilder + MatchPortSrc(port, endPort *int32) IPTablesRuleBuilder MatchICMP(icmpType, icmpCode *int32, ipProtocol Protocol) IPTablesRuleBuilder MatchEstablishedOrRelated() IPTablesRuleBuilder MatchInputInterface(interfaceName string) IPTablesRuleBuilder MatchOutputInterface(interfaceName string) IPTablesRuleBuilder SetTarget(target string) IPTablesRuleBuilder + SetDNATIP(ip string) IPTablesRuleBuilder SetComment(comment string) IPTablesRuleBuilder CopyBuilder() IPTablesRuleBuilder Done() IPTablesRule diff --git a/pkg/ovs/openflow/ofctrl_builder_test.go b/pkg/ovs/openflow/ofctrl_builder_test.go index 9e96e249f97..ad0214c830f 100644 --- a/pkg/ovs/openflow/ofctrl_builder_test.go +++ b/pkg/ovs/openflow/ofctrl_builder_test.go @@ -1299,7 +1299,7 @@ func TestFlowBuilder(t *testing.T) { expectedMatchStr: "udp6", }, { - name: "MatchSrcPort (without mask)", + name: "MatchPortSrc (without mask)", matchFn: func(fb FlowBuilder) FlowBuilder { return fb.MatchSrcPort(0xf001, nil) }, @@ -1313,7 +1313,7 @@ func TestFlowBuilder(t *testing.T) { expectedMatchStr: "tp_src=61441", }, { - name: "MatchSrcPort (with mask)", + name: "MatchPortSrc (with mask)", matchFn: func(fb FlowBuilder) FlowBuilder { return fb.MatchSrcPort(0xf001, &portMask) }, @@ -1329,7 +1329,7 @@ func TestFlowBuilder(t *testing.T) { expectedMatchStr: "tp_src=0xf001/0xf000", }, { - name: "MatchDstPort (without mask)", + name: "MatchPortDst (without mask)", matchFn: func(fb FlowBuilder) FlowBuilder { return fb.MatchDstPort(0xf001, nil) }, @@ -1343,7 +1343,7 @@ func TestFlowBuilder(t *testing.T) { expectedMatchStr: "tp_dst=61441", }, { - name: "MatchDstPort (with mask)", + name: "MatchPortDst (with mask)", matchFn: func(fb FlowBuilder) FlowBuilder { return fb.MatchDstPort(0xf001, &portMask) }, diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index 8e7558383f2..2d3d41cd1aa 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2023 Antrea Authors +// Copyright 2024 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/util/env/env.go b/pkg/util/env/env.go index b23e92d1d4a..b187a4c3568 100644 --- a/pkg/util/env/env.go +++ b/pkg/util/env/env.go @@ -21,6 +21,7 @@ import ( "k8s.io/klog/v2" + "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/util/runtime" ) @@ -134,3 +135,27 @@ func GetAntreaNamespace() string { func GetAllowNoEncapWithoutAntreaProxy() bool { return getBoolEnvVar(allowNoEncapWithoutAntreaProxyEnvKey, false) } + +func GetKubeServiceHost() string { + kubeServiceHost := os.Getenv(k8s.KubeServiceHostEnvKey) + if kubeServiceHost == "" { + klog.Warningf("Environment variable %s not found", k8s.KubeServiceHostEnvKey) + } + return kubeServiceHost +} + +func GetKubeServicePort() int32 { + kubeServicePortStr := os.Getenv(k8s.KubeServicePortEnvKey) + + if kubeServicePortStr == "" { + klog.Warningf("Environment variable %s not found", k8s.KubeServicePortEnvKey) + } else { + parsedValue, err := strconv.ParseInt(kubeServicePortStr, 10, 16) + if err != nil { + klog.Errorf("Failed to parse env variable '%s': %v", k8s.KubeServicePortEnvKey, err) + return 0 + } + return int32(parsedValue) + } + return 0 +} diff --git a/pkg/util/k8s/client.go b/pkg/util/k8s/client.go index 7ab555f170b..29e1f5b8faa 100644 --- a/pkg/util/k8s/client.go +++ b/pkg/util/k8s/client.go @@ -36,8 +36,8 @@ import ( ) const ( - kubeServiceHostEnvKey = "KUBERNETES_SERVICE_HOST" - kubeServicePortEnvKey = "KUBERNETES_SERVICE_PORT" + KubeServiceHostEnvKey = "KUBERNETES_SERVICE_HOST" + KubeServicePortEnvKey = "KUBERNETES_SERVICE_PORT" ) // CreateClients creates kube clients from the given config. @@ -125,8 +125,8 @@ func OverrideKubeAPIServer(kubeAPIServerOverride string) { host = hostPort port = "443" } - os.Setenv(kubeServiceHostEnvKey, host) - os.Setenv(kubeServicePortEnvKey, port) + os.Setenv(KubeServiceHostEnvKey, host) + os.Setenv(KubeServicePortEnvKey, port) } func EndpointSliceAPIAvailable(k8sClient clientset.Interface) (bool, error) { diff --git a/pkg/util/k8s/client_test.go b/pkg/util/k8s/client_test.go index 0b94a1a707a..cddfab5a960 100644 --- a/pkg/util/k8s/client_test.go +++ b/pkg/util/k8s/client_test.go @@ -108,12 +108,12 @@ func TestOverrideKubeAPIServer(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - t.Setenv(kubeServiceHostEnvKey, originalHost) - t.Setenv(kubeServicePortEnvKey, originalPort) + t.Setenv(KubeServiceHostEnvKey, originalHost) + t.Setenv(KubeServicePortEnvKey, originalPort) OverrideKubeAPIServer(tt.kubeAPIServerOverride) - assert.Equal(t, tt.expectHost, os.Getenv(kubeServiceHostEnvKey)) - assert.Equal(t, tt.expectPort, os.Getenv(kubeServicePortEnvKey)) + assert.Equal(t, tt.expectHost, os.Getenv(KubeServiceHostEnvKey)) + assert.Equal(t, tt.expectPort, os.Getenv(KubeServicePortEnvKey)) }) } }