diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index fe0de0c5cf5..f8bcd823b87 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -113,6 +113,7 @@ func run(o *Options) error { serviceInformer := informerFactory.Core().V1().Services() endpointsInformer := informerFactory.Core().V1().Endpoints() namespaceInformer := informerFactory.Core().V1().Namespaces() + podInformer := informerFactory.Core().V1().Pods() // Create Antrea Clientset for the given config. antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) @@ -152,7 +153,7 @@ func run(o *Options) error { connectUplinkToBridge, multicastEnabled, features.DefaultFeatureGate.Enabled(features.TrafficControl), - features.DefaultFeatureGate.Enabled(features.Multicluster), + enableMulticluster, ) var serviceCIDRNet *net.IPNet @@ -252,7 +253,8 @@ func run(o *Options) error { o.config.ExternalNode.ExternalNodeNamespace, features.DefaultFeatureGate.Enabled(features.AntreaProxy), o.config.AntreaProxy.ProxyAll, - connectUplinkToBridge) + connectUplinkToBridge, + enableMulticluster) err = agentInitializer.Initialize() if err != nil { return fmt.Errorf("error initializing agent: %v", err) @@ -314,20 +316,27 @@ func run(o *Options) error { ) } - var mcRouteController *mcroute.MCRouteController + var mcDefaultRouteController *mcroute.MCDefaultRouteController var mcStrechedNetworkPolicyController *mcroute.StretchedNetworkPolicyController + var mcPodRouteController *mcroute.MCPodRouteController var mcInformerFactory mcinformers.SharedInformerFactory + notEncapMode := networkConfig.TrafficEncapMode != config.TrafficEncapModeEncap if enableMulticluster { mcNamespace := env.GetPodNamespace() if o.config.Multicluster.Namespace != "" { mcNamespace = o.config.Multicluster.Namespace } - mcInformerFactory = mcinformers.NewSharedInformerFactory(mcClient, informerDefaultResync) + mcInformerFactory = mcinformers.NewSharedInformerFactoryWithOptions(mcClient, + informerDefaultResync, + mcinformers.WithTweakListOptions(func(lo *metav1.ListOptions) { + lo.FieldSelector = fields.Set{"metadata.namespace": mcNamespace}.String() + }), + ) gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways() ciImportInformer := mcInformerFactory.Multicluster().V1alpha1().ClusterInfoImports() labelIDInformer := mcInformerFactory.Multicluster().V1alpha1().LabelIdentities() - mcRouteController = mcroute.NewMCRouteController( + mcDefaultRouteController = mcroute.NewMCDefaultRouteController( mcClient, gwInformer, ciImportInformer, @@ -348,6 +357,18 @@ func run(o *Options) error { podUpdateChannel, ) } + if notEncapMode { + mcPodRouteController = mcroute.NewMCPodRouteController( + mcClient, + podInformer.Informer(), + gwInformer.Informer(), + ofClient, + ovsBridgeClient, + ifaceStore, + nodeConfig, + mcNamespace, + ) + } } var groupCounters []proxytypes.GroupCounter groupIDUpdates := make(chan string, 100) @@ -475,15 +496,24 @@ func run(o *Options) error { } } + var mtuDeduction int var cniServer *cniserver.CNIServer var cniPodInfoStore cnipodcache.CNIPodInfoStore var externalNodeController *externalnode.ExternalNodeController var localExternalNodeInformer cache.SharedIndexInformer - if o.nodeType == config.K8sNode { - isChaining := false - if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { - isChaining = true + if enableMulticluster { + tunnelType := ovsconfig.TunnelType(o.config.TunnelType) + switch tunnelType { + case ovsconfig.GeneveTunnel: + mtuDeduction = config.GeneveOverhead + case ovsconfig.GRETunnel: + mtuDeduction = config.GREOverhead + case ovsconfig.VXLANTunnel: + mtuDeduction = config.VXLANOverhead } + } + if o.nodeType == config.K8sNode { + isChaining := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() cniServer = cniserver.New( o.config.CNISocket, o.config.HostProcPathPrefix, @@ -494,6 +524,7 @@ func run(o *Options) error { enableBridgingMode, enableAntreaIPAM, o.config.DisableTXChecksumOffload, + mtuDeduction, networkReadyCh) if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { @@ -747,10 +778,13 @@ func run(o *Options) error { if enableMulticluster { mcInformerFactory.Start(stopCh) - go mcRouteController.Run(stopCh) + go mcDefaultRouteController.Run(stopCh) if o.config.Multicluster.EnableStretchedNetworkPolicy { go mcStrechedNetworkPolicyController.Run(stopCh) } + if notEncapMode { + go mcPodRouteController.Run(stopCh) + } } // statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for diff --git a/multicluster/controllers/multicluster/resourceexport_controller.go b/multicluster/controllers/multicluster/resourceexport_controller.go index 47f738265ec..3c51a472d5e 100644 --- a/multicluster/controllers/multicluster/resourceexport_controller.go +++ b/multicluster/controllers/multicluster/resourceexport_controller.go @@ -355,12 +355,12 @@ func (r *ResourceExportReconciler) refreshEndpointsResourceImport( } if len(svcResExport.Status.Conditions) > 0 { if svcResExport.Status.Conditions[0].Status != corev1.ConditionTrue { - return newResImport, false, fmt.Errorf("corresponding Service type of ResourceExport " + svcResExportName.String() + - "has not been converged successfully, retry later") + err := fmt.Errorf("the Service type of ResourceExport %s has not been converged successfully, retry later", svcResExportName.String()) + return newResImport, false, err } } else { - return newResImport, false, fmt.Errorf("corresponding Service type of ResourceExport " + svcResExportName.String() + - "has not been converged yet, retry later") + err := fmt.Errorf("the Service type of ResourceExport %s has not been converged yet, retry later", svcResExportName.String()) + return newResImport, false, err } } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index d23059cdc8c..52995c255d4 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -112,6 +112,7 @@ type Initializer struct { // networkReadyCh should be closed once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. proxyAll bool + enableMulticluster bool networkReadyCh chan<- struct{} stopCh <-chan struct{} nodeType config.NodeType @@ -139,6 +140,7 @@ func NewInitializer( enableProxy bool, proxyAll bool, connectUplinkToBridge bool, + enableMulticluster bool, ) *Initializer { return &Initializer{ ovsBridgeClient: ovsBridgeClient, @@ -161,6 +163,7 @@ func NewInitializer( enableProxy: enableProxy, proxyAll: proxyAll, connectUplinkToBridge: connectUplinkToBridge, + enableMulticluster: enableMulticluster, } } @@ -772,10 +775,11 @@ func (i *Initializer) setupDefaultTunnelInterface() error { // It's not necessary for new Linux kernel versions with the following patch: // https://github.com/torvalds/linux/commit/89e5c58fc1e2857ccdaae506fb8bc5fed57ee063. shouldEnableCsum := i.networkConfig.TunnelCsum && (i.networkConfig.TunnelType == ovsconfig.GeneveTunnel || i.networkConfig.TunnelType == ovsconfig.VXLANTunnel) + tunnelInterfaceSupported := i.networkConfig.TrafficEncapMode.SupportsEncap() || i.enableMulticluster // Check the default tunnel port. if portExists { - if i.networkConfig.TrafficEncapMode.SupportsEncap() && + if tunnelInterfaceSupported && tunnelIface.TunnelInterfaceConfig.Type == i.networkConfig.TunnelType && tunnelIface.TunnelInterfaceConfig.DestinationPort == i.networkConfig.TunnelPort && tunnelIface.TunnelInterfaceConfig.LocalIP.Equal(localIP) { @@ -792,7 +796,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error { } if err := i.ovsBridgeClient.DeletePort(tunnelIface.PortUUID); err != nil { - if i.networkConfig.TrafficEncapMode.SupportsEncap() { + if tunnelInterfaceSupported { return fmt.Errorf("failed to remove tunnel port %s with wrong tunnel type: %s", tunnelPortName, err) } klog.Errorf("Failed to remove tunnel port %s in NoEncapMode: %v", tunnelPortName, err) @@ -803,7 +807,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error { } // Create the default tunnel port and interface. - if i.networkConfig.TrafficEncapMode.SupportsEncap() { + if tunnelInterfaceSupported { if tunnelPortName != defaultTunInterfaceName { // Reset the tunnel interface name to the desired name before // recreating the tunnel port and interface. @@ -1126,7 +1130,8 @@ func (i *Initializer) getNodeMTU(transportInterface *net.Interface) (int, error) if mtu <= 0 { return 0, fmt.Errorf("Failed to fetch Node MTU : %v", mtu) } - if i.networkConfig.TrafficEncapMode.SupportsEncap() { + // When the multi-cluster is enabled, we need deduct MTU for any potential cross-cluster traffic. + if i.networkConfig.TrafficEncapMode.SupportsEncap() || i.enableMulticluster { if i.networkConfig.TunnelType == ovsconfig.VXLANTunnel { mtu -= config.VXLANOverhead } else if i.networkConfig.TunnelType == ovsconfig.GeneveTunnel { diff --git a/pkg/agent/cniserver/interface_configuration_linux.go b/pkg/agent/cniserver/interface_configuration_linux.go index a45264ac917..ef74ac371e1 100644 --- a/pkg/agent/cniserver/interface_configuration_linux.go +++ b/pkg/agent/cniserver/interface_configuration_linux.go @@ -373,6 +373,43 @@ func (ic *ifConfigurator) configureContainerLink( } } +func (ic *ifConfigurator) configureContainerMTU(containerNetNS string, containerIFDev string, mtuDeduction int) error { + var peerIdx int + if err := ns.WithNetNSPath(containerNetNS, func(hostNS ns.NetNS) error { + link, err := netlink.LinkByName(containerIFDev) + if err != nil { + return fmt.Errorf("failed to find interface %s in container %s: %v", containerIFDev, containerNetNS, err) + } + _, peerIdx, err = ip.GetVethPeerIfindex(containerIFDev) + if err != nil { + return fmt.Errorf("failed to get peer index for dev %s in container %s: %w", containerIFDev, containerNetNS, err) + } + err = netlink.LinkSetMTU(link, link.Attrs().MTU-mtuDeduction) + if err != nil { + return fmt.Errorf("failed to set MTU for interface %s in container %s: %v", containerIFDev, containerNetNS, err) + } + return nil + }); err != nil { + return err + } + + peerIntf, err := net.InterfaceByIndex(peerIdx) + if err != nil { + return fmt.Errorf("failed to get host interface for index %d: %w", peerIdx, err) + } + + hostInterfaceName := peerIntf.Name + link, err := netlink.LinkByName(hostInterfaceName) + if err != nil { + return fmt.Errorf("failed to find host interface %s: %v", hostInterfaceName, err) + } + err = netlink.LinkSetMTU(link, link.Attrs().MTU-mtuDeduction) + if err != nil { + return fmt.Errorf("failed to set MTU for host interface %s: %v", hostInterfaceName, err) + } + return nil +} + func (ic *ifConfigurator) removeContainerLink(containerID, hostInterfaceName string) error { klog.V(2).Infof("Deleting veth devices for container %s", containerID) // Don't return an error if the device is already removed as CniDel can be called multiple times. diff --git a/pkg/agent/cniserver/interface_configuration_windows.go b/pkg/agent/cniserver/interface_configuration_windows.go index 40042db1256..9fdde4c4b74 100644 --- a/pkg/agent/cniserver/interface_configuration_windows.go +++ b/pkg/agent/cniserver/interface_configuration_windows.go @@ -173,6 +173,12 @@ func (ic *ifConfigurator) configureContainerLink( return nil } +// configureContainerMTU is only used for Antrea Multi-cluster with networkPolicyOnly +// mode, and this mode doesn't support Windows platform yet. +func (ic *ifConfigurator) configureContainerMTU(containerNetNS string, containerIFDev string, mtuDeduction int) error { + return nil +} + // createContainerLink creates HNSEndpoint using the IP configuration in the IPAM result. func (ic *ifConfigurator) createContainerLink(endpointName string, result *current.Result, containerID, podName, podNamespace string) (hostLink *hcsshim.HNSEndpoint, err error) { containerIP, err := findContainerIPConfig(result.IPs) diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index c50b248608b..02a5f4fe69c 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -196,6 +196,13 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in return interfaceConfig } +func (pc *podConfigurator) configureInterfacesMTU(containerNetNS string, containerIFDev string, mtuDeduction int) error { + if err := pc.ifConfigurator.configureContainerMTU(containerNetNS, containerIFDev, mtuDeduction); err != nil { + return err + } + return nil +} + func (pc *podConfigurator) configureInterfaces( podName string, podNameSpace string, diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index 8eb43d72af8..5b17def838a 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -117,6 +117,7 @@ type CNIServer struct { enableSecondaryNetworkIPAM bool disableTXChecksumOffload bool secondaryNetworkEnabled bool + mtuDeduction int // networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. networkReadyCh <-chan struct{} } @@ -618,7 +619,7 @@ func New( nodeConfig *config.NodeConfig, kubeClient clientset.Interface, routeClient route.Interface, - isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, + isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, mtuDeduction int, networkReadyCh <-chan struct{}, ) *CNIServer { return &CNIServer{ @@ -634,6 +635,7 @@ func New( enableBridgingMode: enableBridgingMode, disableTXChecksumOffload: disableTXChecksumOffload, enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM, + mtuDeduction: mtuDeduction, networkReadyCh: networkReadyCh, } } @@ -710,6 +712,18 @@ func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, e return &cnipb.CniCmdResponse{CniResult: []byte("")}, fmt.Errorf("failed to connect container %s to ovs: %w", cniConfig.ContainerId, err) } + // Packets for multi-cluster traffic will always be encapsulated and go through + // tunnel interface. So here we need to deduce mtu for different tunnel types. + if s.mtuDeduction != 0 { + if err := s.podConfigurator.configureInterfacesMTU( + s.hostNetNsPath(cniConfig.Netns), + cniConfig.Ifname, + s.mtuDeduction, + ); err != nil { + return &cnipb.CniCmdResponse{CniResult: []byte("")}, fmt.Errorf("failed to configure container %s's MTU: %w", cniConfig.ContainerId, err) + } + } + // we return prevResult, which should be exactly what we received from // the runtime, potentially converted to the current CNI version used by // Antrea. diff --git a/pkg/agent/multicluster/mc_route_controller.go b/pkg/agent/multicluster/mc_route_controller.go index 852f3d4a2a5..4620031adfc 100644 --- a/pkg/agent/multicluster/mc_route_controller.go +++ b/pkg/agent/multicluster/mc_route_controller.go @@ -15,6 +15,7 @@ package multicluster import ( + "errors" "fmt" "net" "time" @@ -37,7 +38,7 @@ import ( ) const ( - controllerName = "AntreaAgentMCRouteController" + controllerName = "AntreaAgentMCDefaultRouteController" // Set resyncPeriod to 0 to disable resyncing resyncPeriod = 0 * time.Second @@ -50,10 +51,10 @@ const ( workerItemKey = "key" ) -// MCRouteController watches Gateway and ClusterInfoImport events. +// MCDefaultRouteController watches Gateway and ClusterInfoImport events. // It is responsible for setting up necessary Openflow entries for multi-cluster // traffic on a Gateway or a regular Node. -type MCRouteController struct { +type MCDefaultRouteController struct { mcClient mcclientset.Interface ovsBridgeClient ovsconfig.OVSBridgeClient ofClient openflow.Client @@ -67,7 +68,7 @@ type MCRouteController struct { ciImportListerSynced cache.InformerSynced queue workqueue.RateLimitingInterface // installedCIImports is for saving ClusterInfos which have been processed - // in MCRouteController. Need to use mutex to protect 'installedCIImports' if + // in MCDefaultRouteController. Need to use mutex to protect 'installedCIImports' if // we change the number of 'defaultWorkers'. installedCIImports map[string]*mcv1alpha1.ClusterInfoImport // Need to use mutex to protect 'installedActiveGW' if we change @@ -79,7 +80,7 @@ type MCRouteController struct { enableStretchedNetworkPolicy bool } -func NewMCRouteController( +func NewMCDefaultRouteController( mcClient mcclientset.Interface, gwInformer mcinformers.GatewayInformer, ciImportInformer mcinformers.ClusterInfoImportInformer, @@ -89,8 +90,8 @@ func NewMCRouteController( nodeConfig *config.NodeConfig, namespace string, enableStretchedNetworkPolicy bool, -) *MCRouteController { - controller := &MCRouteController{ +) *MCDefaultRouteController { + controller := &MCDefaultRouteController{ mcClient: mcClient, ovsBridgeClient: ovsBridgeClient, ofClient: client, @@ -138,22 +139,10 @@ func NewMCRouteController( return controller } -func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) { +func (c *MCDefaultRouteController) enqueueGateway(obj interface{}, isDelete bool) { gw, isGW := obj.(*mcv1alpha1.Gateway) if !isGW { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - klog.ErrorS(nil, "Received unexpected object", "object", obj) - return - } - gw, ok = deletedState.Obj.(*mcv1alpha1.Gateway) - if !ok { - klog.ErrorS(nil, "DeletedFinalStateUnknown contains non-Gateway object", "object", deletedState.Obj) - return - } - } - - if gw.Namespace != c.namespace { + klog.ErrorS(errors.New("received unexpected object"), "enqueueGateway can't process event", "obj", obj) return } @@ -166,22 +155,10 @@ func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) { c.queue.Add(workerItemKey) } -func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete bool) { +func (c *MCDefaultRouteController) enqueueClusterInfoImport(obj interface{}, isDelete bool) { ciImp, isciImp := obj.(*mcv1alpha1.ClusterInfoImport) if !isciImp { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - klog.ErrorS(nil, "Received unexpected object", "object", obj) - return - } - ciImp, ok = deletedState.Obj.(*mcv1alpha1.ClusterInfoImport) - if !ok { - klog.ErrorS(nil, "DeletedFinalStateUnknown contains non-ClusterInfoImport object", "object", deletedState.Obj) - return - } - } - - if ciImp.Namespace != c.namespace { + klog.ErrorS(errors.New("received unexpected object"), "enqueueClusterInfoImport can't process event", "obj", obj) return } @@ -201,7 +178,7 @@ func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete b // Run will create defaultWorkers workers (go routines) which will process // the Gateway events from the workqueue. -func (c *MCRouteController) Run(stopCh <-chan struct{}) { +func (c *MCDefaultRouteController) Run(stopCh <-chan struct{}) { defer c.queue.ShutDown() cacheSyncs := []cache.InformerSynced{c.gwListerSynced, c.ciImportListerSynced} klog.InfoS("Starting controller", "controller", controllerName) @@ -218,12 +195,12 @@ func (c *MCRouteController) Run(stopCh <-chan struct{}) { // worker is a long-running function that will continually call the processNextWorkItem // function in order to read and process a message on the workqueue. -func (c *MCRouteController) worker() { +func (c *MCDefaultRouteController) worker() { for c.processNextWorkItem() { } } -func (c *MCRouteController) processNextWorkItem() bool { +func (c *MCDefaultRouteController) processNextWorkItem() bool { obj, quit := c.queue.Get() if quit { return false @@ -244,7 +221,7 @@ func (c *MCRouteController) processNextWorkItem() bool { return true } -func (c *MCRouteController) syncMCFlows() error { +func (c *MCDefaultRouteController) syncMCFlows() error { startTime := time.Now() defer func() { klog.V(4).InfoS("Finished syncing flows for Multi-cluster", "time", time.Since(startTime)) @@ -287,13 +264,13 @@ func (c *MCRouteController) syncMCFlows() error { return nil } -func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error { +func (c *MCDefaultRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error { desiredCIImports, err := c.ciImportLister.ClusterInfoImports(c.namespace).List(labels.Everything()) if err != nil { return err } - activeGWChanged := c.checkGateWayIPChange(activeGW) + activeGWChanged := c.checkGatewayIPChange(activeGW) installedCIImportNames := sets.StringKeySet(c.installedCIImports) for _, ciImp := range desiredCIImports { if err = c.addMCFlowsForSingleCIImp(activeGW, ciImp, c.installedCIImports[ciImp.Name], activeGWChanged); err != nil { @@ -310,7 +287,7 @@ func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway return nil } -func (c *MCRouteController) checkGateWayIPChange(activeGW *mcv1alpha1.Gateway) bool { +func (c *MCDefaultRouteController) checkGatewayIPChange(activeGW *mcv1alpha1.Gateway) bool { var activeGWChanged bool if activeGW.Name == c.nodeConfig.Name { // On a Gateway Node, the GatewayIP of the active Gateway will impact the Openflow rules. @@ -322,7 +299,7 @@ func (c *MCRouteController) checkGateWayIPChange(activeGW *mcv1alpha1.Gateway) b return activeGWChanged } -func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error { +func (c *MCDefaultRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error { allCIImports, err := c.ciImportLister.ClusterInfoImports(c.namespace).List(labels.Everything()) if err != nil { return err @@ -340,7 +317,7 @@ func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) return nil } -func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport, +func (c *MCDefaultRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport, installedCIImp *mcv1alpha1.ClusterInfoImport, activeGWChanged bool) error { tunnelPeerIPToRemoteGW := getPeerGatewayIP(ciImport.Spec) if tunnelPeerIPToRemoteGW == nil { @@ -395,42 +372,45 @@ func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gatewa return nil } -func (c *MCRouteController) deleteMCFlowsForSingleCIImp(ciImpName string) error { - if err := c.ofClient.UninstallMulticlusterFlows(ciImpName); err != nil { +func (c *MCDefaultRouteController) deleteMCFlowsForSingleCIImp(ciImpName string) error { + if err := c.ofClient.UninstallMulticlusterFlows(ciImpName, false); err != nil { return fmt.Errorf("failed to uninstall multi-cluster flows to remote Gateway Node %s: %v", ciImpName, err) } delete(c.installedCIImports, ciImpName) return nil } -func (c *MCRouteController) deleteMCFlowsForAllCIImps() error { +func (c *MCDefaultRouteController) deleteMCFlowsForAllCIImps() error { for _, ciImp := range c.installedCIImports { c.deleteMCFlowsForSingleCIImp(ciImp.Name) } return nil } -// getActiveGateway compares Gateway's CreationTimestamp to get the active Gateway, -// The last created Gateway will be the active Gateway. -func (c *MCRouteController) getActiveGateway() (*mcv1alpha1.Gateway, error) { - gws, err := c.gwLister.Gateways(c.namespace).List(labels.Everything()) +func (c *MCDefaultRouteController) getActiveGateway() (*mcv1alpha1.Gateway, error) { + activeGW, err := getActiveGateway(c.gwLister, c.namespace) if err != nil { return nil, err } - if len(gws) == 0 { + if activeGW == nil { return nil, nil } - // Comparing Gateway's CreationTimestamp to get the last created Gateway. - lastCreatedGW := gws[0] - for _, gw := range gws { - if lastCreatedGW.CreationTimestamp.Before(&gw.CreationTimestamp) { - lastCreatedGW = gw - } + if net.ParseIP(activeGW.GatewayIP) == nil || net.ParseIP(activeGW.InternalIP) == nil { + return nil, fmt.Errorf("the active Gateway %s has no valid GatewayIP or InternalIP", activeGW.Name) + } + return activeGW, nil +} + +func getActiveGateway(gwLister mclisters.GatewayLister, namespace string) (*mcv1alpha1.Gateway, error) { + gws, err := gwLister.Gateways(namespace).List(labels.Everything()) + if err != nil { + return nil, err } - if net.ParseIP(lastCreatedGW.GatewayIP) == nil || net.ParseIP(lastCreatedGW.InternalIP) == nil { - return nil, fmt.Errorf("the last created Gateway %s has no valid GatewayIP or InternalIP", lastCreatedGW.Name) + if len(gws) == 0 { + return nil, nil } - return lastCreatedGW, nil + // The Gateway webhook guarantees there will be at most one Gateway in a cluster. + return gws[0], nil } func generatePeerConfigs(subnets []string, gatewayIP net.IP) (map[*net.IPNet]net.IP, error) { diff --git a/pkg/agent/multicluster/mc_route_controller_test.go b/pkg/agent/multicluster/mc_route_controller_test.go index c745e5c35f5..d727d215cc7 100644 --- a/pkg/agent/multicluster/mc_route_controller_test.go +++ b/pkg/agent/multicluster/mc_route_controller_test.go @@ -22,6 +22,7 @@ import ( "github.com/golang/mock/gomock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" mcfake "antrea.io/antrea/multicluster/pkg/client/clientset/versioned/fake" @@ -33,7 +34,7 @@ import ( ) type fakeRouteController struct { - *MCRouteController + *MCDefaultRouteController mcClient *mcfake.Clientset informerFactory mcinformers.SharedInformerFactory ofClient *oftest.MockClient @@ -41,9 +42,14 @@ type fakeRouteController struct { interfaceStore interfacestore.InterfaceStore } -func newMCRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRouteController, func()) { +func newMCDefaultRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRouteController, func()) { mcClient := mcfake.NewSimpleClientset() - mcInformerFactory := mcinformers.NewSharedInformerFactory(mcClient, 60*time.Second) + mcInformerFactory := mcinformers.NewSharedInformerFactoryWithOptions(mcClient, + 60*time.Second, + mcinformers.WithTweakListOptions(func(lo *metav1.ListOptions) { + lo.FieldSelector = fields.Set{"metadata.namespace": defaultNs}.String() + }), + ) gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways() ciImpInformer := mcInformerFactory.Multicluster().V1alpha1().ClusterInfoImports() @@ -51,7 +57,7 @@ func newMCRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRou ofClient := oftest.NewMockClient(ctrl) ovsClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl) interfaceStore := interfacestore.NewInterfaceStore() - c := NewMCRouteController( + c := NewMCDefaultRouteController( mcClient, gwInformer, ciImpInformer, @@ -63,12 +69,12 @@ func newMCRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRou true, ) return &fakeRouteController{ - MCRouteController: c, - mcClient: mcClient, - informerFactory: mcInformerFactory, - ofClient: ofClient, - ovsClient: ovsClient, - interfaceStore: interfaceStore, + MCDefaultRouteController: c, + mcClient: mcClient, + informerFactory: mcInformerFactory, + ofClient: ofClient, + ovsClient: ovsClient, + interfaceStore: interfaceStore, }, ctrl.Finish } @@ -96,6 +102,16 @@ var ( gw1GatewayIP = net.ParseIP(gateway1.GatewayIP) gw2InternalIP = net.ParseIP(gateway2.InternalIP) + gateway3 = mcv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-3", + Namespace: "kube-system", + CreationTimestamp: gw2CreationTime, + }, + GatewayIP: "172.17.0.13", + InternalIP: "192.17.0.13", + } + clusterInfoImport1 = mcv1alpha1.ClusterInfoImport{ ObjectMeta: metav1.ObjectMeta{ Name: "cluster-b-default-clusterinfo", @@ -130,7 +146,7 @@ var ( ) func TestMCRouteControllerAsGateway(t *testing.T) { - c, closeFn := newMCRouteController(t, &config.NodeConfig{Name: "node-1"}) + c, closeFn := newMCDefaultRouteController(t, &config.NodeConfig{Name: "node-1"}) defer closeFn() defer c.queue.ShutDown() @@ -175,7 +191,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { // Delete a ClusterInfoImport c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()).Delete(context.TODO(), clusterInfoImport2.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1) + c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name, false).Times(1) c.processNextWorkItem() // Update Gateway1's GatewayIP @@ -195,28 +211,22 @@ func TestMCRouteControllerAsGateway(t *testing.T) { updatedGateway1b, metav1.UpdateOptions{}) c.processNextWorkItem() + // Delete Gateway1 + c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), + gateway1.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name, false).Times(1) + c.processNextWorkItem() + + // Create Gateway3 which is not in the default Namespace. + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.GetNamespace()).Create(context.TODO(), + &gateway3, metav1.CreateOptions{}) + // Create Gateway2 as active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), gw2InternalIP, true).Times(1) c.processNextWorkItem() - - // Delete Gateway2, then Gateway1 become active Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Delete(context.TODO(), - gateway2.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).Times(1) - c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1, updatedGateway1aIP, true).Times(1) - c.processNextWorkItem() - - // Delete last Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), - gateway1.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.processNextWorkItem() }() select { case <-time.After(5 * time.Second): @@ -226,7 +236,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { } func TestMCRouteControllerAsRegularNode(t *testing.T) { - c, closeFn := newMCRouteController(t, &config.NodeConfig{Name: "node-3"}) + c, closeFn := newMCDefaultRouteController(t, &config.NodeConfig{Name: "node-3"}) defer closeFn() defer c.queue.ShutDown() @@ -271,7 +281,7 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { // Delete a ClusterInfoImport c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()).Delete(context.TODO(), clusterInfoImport2.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1) + c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name, false).Times(1) c.processNextWorkItem() // Update Gateway1's GatewayIP @@ -291,27 +301,21 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { gomock.Any(), updatedGateway1bIP, true).Times(1) c.processNextWorkItem() + // Delete Gateway1 + c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), + gateway1.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name, false).Times(1) + c.processNextWorkItem() + + // Create Gateway3 which is not in the default Namespace. + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.GetNamespace()).Create(context.TODO(), + &gateway3, metav1.CreateOptions{}) + // Create Gateway2 as the active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP2, true).Times(1) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.processNextWorkItem() - - // Delete Gateway2, then Gateway1 become active Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Delete(context.TODO(), - gateway2.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) - c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, - gomock.Any(), updatedGateway1bIP, true).Times(1) - c.processNextWorkItem() - - // Delete last Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), - gateway1.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.processNextWorkItem() }() select { diff --git a/pkg/agent/multicluster/pod_route_controller.go b/pkg/agent/multicluster/pod_route_controller.go new file mode 100644 index 00000000000..261741b9df7 --- /dev/null +++ b/pkg/agent/multicluster/pod_route_controller.go @@ -0,0 +1,306 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicluster + +import ( + "errors" + "net" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcclientset "antrea.io/antrea/multicluster/pkg/client/clientset/versioned" + mclisters "antrea.io/antrea/multicluster/pkg/client/listers/multicluster/v1alpha1" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/ovs/ovsconfig" +) + +const ( + // Default number of workers processing a resource change + defaultWorkerNum = 5 +) + +// MCPodRouteController generates L3 forwarding flows to forward cross-cluster +// traffic from MC Gateway to Pods on other Nodes inside a member cluster. It is +// required when networkPolicyOnly, noEncap or hybrid mode are configured, to forward +// the traffic through tunnels between Gateway and other Nodes, as otherwise the +// traffic will not go through tunnels in those modes. +type MCPodRouteController struct { + mcClient mcclientset.Interface + ovsBridgeClient ovsconfig.OVSBridgeClient + ofClient openflow.Client + interfaceStore interfacestore.InterfaceStore + nodeConfig *config.NodeConfig + queue workqueue.RateLimitingInterface + podInformer cache.SharedIndexInformer + podLister corelisters.PodLister + gwInformer cache.SharedIndexInformer + gwLister mclisters.GatewayLister + activeGWName string + namespace string + mutex sync.Mutex + installedPods map[string]string +} + +type endpointChangeInfo struct { + podIP string + podNodeIP string +} + +func NewMCPodRouteController( + mcClient mcclientset.Interface, + podInformer cache.SharedIndexInformer, + gwInformer cache.SharedIndexInformer, + client openflow.Client, + ovsBridgeClient ovsconfig.OVSBridgeClient, + interfaceStore interfacestore.InterfaceStore, + nodeConfig *config.NodeConfig, + namespace string, +) *MCPodRouteController { + controller := &MCPodRouteController{ + mcClient: mcClient, + ovsBridgeClient: ovsBridgeClient, + ofClient: client, + interfaceStore: interfaceStore, + nodeConfig: nodeConfig, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "MCPodRouteController"), + podInformer: podInformer, + podLister: corelisters.NewPodLister(podInformer.GetIndexer()), + gwInformer: gwInformer, + gwLister: mclisters.NewGatewayLister(gwInformer.GetIndexer()), + namespace: namespace, + installedPods: make(map[string]string), + } + + controller.gwInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(cur interface{}) { + controller.enqueueGateway(cur, false) + }, + // Gateway UPDATE event doesn't impact Pod flows, so ignore it. + DeleteFunc: func(old interface{}) { + controller.enqueueGateway(old, true) + }, + }, + resyncPeriod, + ) + + controller.podInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(cur interface{}) { + controller.enqueuePod(cur, false) + }, + UpdateFunc: func(old, cur interface{}) { + controller.enqueuePod(cur, false) + }, + DeleteFunc: func(old interface{}) { + controller.enqueuePod(old, true) + }, + }, + resyncPeriod, + ) + return controller +} + +func (c *MCPodRouteController) enqueueGateway(obj interface{}, isDelete bool) { + gw, isGW := obj.(*mcv1alpha1.Gateway) + if !isGW { + klog.ErrorS(errors.New("received unexpected object"), "enqueueGateway can't process event", "obj", obj) + return + } + + c.mutex.Lock() + defer c.mutex.Unlock() + // Gateway webhook will guarantee that at most one Gateway in a cluster. + if isDelete { + c.activeGWName = "" + if c.nodeConfig.Name != gw.Name { + klog.InfoS("This Node is not the deleted Gateway, skip it", "node", c.nodeConfig.Name, "gateway", gw.Name) + return + } + + for podIP := range c.installedPods { + c.queue.Add(&endpointChangeInfo{ + podIP: podIP, + }) + delete(c.installedPods, podIP) + } + return + } + + c.activeGWName = gw.Name + if c.nodeConfig.Name != gw.Name { + klog.InfoS("The Node is not the Gateway, skip it", "node", c.nodeConfig.Name, "gateway", gw.Name) + return + } + pods, _ := c.podLister.List(labels.Everything()) + for _, p := range pods { + pod := p + isValidPod := c.validatePod(pod) + if !isValidPod { + continue + } + c.queue.Add(&endpointChangeInfo{ + podIP: pod.Status.PodIP, + podNodeIP: pod.Status.HostIP, + }) + c.installedPods[pod.Status.PodIP] = pod.Status.HostIP + } +} + +// validatePod validates the Pod info to determine if it's a valid Pod which should +// have Openflow flows installed for cross-cluster traffic. +func (c *MCPodRouteController) validatePod(pod *corev1.Pod) bool { + podIP := pod.Status.PodIP + podNodeIP := pod.Status.HostIP + + if pod.Spec.NodeName == c.activeGWName { + klog.V(2).InfoS("Skip Pod on the Gateway Node") + return false + } + + if podIP == "" || podNodeIP == "" || pod.Spec.HostNetwork { + klog.V(2).InfoS("Pod has no valid IPs or it is in HostNetwork, skip it", "name", klog.KObj(pod), + "podIP", podIP, "nodeIP", podNodeIP, "hostNetwork", pod.Spec.HostNetwork) + return false + } + + if c.installedPods[podIP] == podNodeIP { + klog.V(2).InfoS("No Pod change impacts installed Openflow rules", "name", klog.KObj(pod)) + return false + } + return true +} + +func (c *MCPodRouteController) enqueuePod(obj interface{}, isDelete bool) { + pod, isPod := obj.(*corev1.Pod) + if !isPod { + klog.ErrorS(errors.New("received unexpected object"), "enqueuePod can't process event", "obj", obj) + return + } + + c.mutex.Lock() + defer c.mutex.Unlock() + if c.activeGWName == "" || c.nodeConfig.Name != c.activeGWName { + klog.InfoS("No active gateway or the Node is not the active Gateway", "node", c.nodeConfig.Name, "gateway", c.activeGWName) + return + } + + podIP := pod.Status.PodIP + podNodeIP := pod.Status.HostIP + _, exists := c.installedPods[podIP] + if isDelete && exists { + c.queue.Add(&endpointChangeInfo{ + podIP: podIP, + }) + delete(c.installedPods, podIP) + return + } + + isValidPod := c.validatePod(pod) + if !isValidPod { + return + } + c.queue.Add(&endpointChangeInfo{ + podNodeIP: podNodeIP, + podIP: podIP, + }) + c.installedPods[podIP] = podNodeIP +} + +func (c *MCPodRouteController) Run(stopCh <-chan struct{}) { + controllerName := "AntreaAgentMulticlusterNodeRouteController" + defer c.queue.ShutDown() + + cacheSyncs := []cache.InformerSynced{c.podInformer.HasSynced, c.gwInformer.HasSynced} + klog.InfoS("Starting controller", "controller", controllerName) + defer klog.InfoS("Shutting down controller", "controller", controllerName) + if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { + return + } + + c.initialize() + for i := 0; i < defaultWorkerNum; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + <-stopCh +} + +func (c *MCPodRouteController) worker() { + for c.processNextWorkItem() { + } +} + +func (c *MCPodRouteController) processNextWorkItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + if k, ok := obj.(*endpointChangeInfo); !ok { + c.queue.Forget(obj) + klog.InfoS("Expected endpointChangeInfo in work queue but got", "object", obj) + return true + } else if err := c.syncPodFlows(k); err == nil { + c.queue.Forget(k) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.queue.AddRateLimited(k) + klog.ErrorS(err, "Error syncing key, requeuing", "key", k) + } + return true +} + +func (c *MCPodRouteController) initialize() { + activeGW, err := getActiveGateway(c.gwLister, c.namespace) + if err != nil { + klog.ErrorS(err, "Failed to get an active Gateway") + } + + if activeGW != nil { + c.mutex.Lock() + defer c.mutex.Unlock() + c.activeGWName = activeGW.Name + klog.InfoS("Found an active Gateway", "gateway", klog.KObj(activeGW)) + } +} + +func (c *MCPodRouteController) syncPodFlows(key *endpointChangeInfo) error { + isDelete := key.podNodeIP == "" + if isDelete { + klog.V(2).InfoS("Deleting Multi-cluster flows for Pod", "podIP", key.podIP) + if err := c.ofClient.UninstallMulticlusterFlows(key.podIP, true); err != nil { + klog.ErrorS(err, "Failed to uninstall Multi-cluster flows for Pod", "podIP", key.podIP) + return err + } + } else { + klog.V(2).InfoS("Adding Multi-cluster flows for Pod", "podIP", key.podIP, "nodeIP", key.podNodeIP) + if err := c.ofClient.InstallMulticlusterPodFlows(net.ParseIP(key.podIP), net.ParseIP(key.podNodeIP)); err != nil { + klog.ErrorS(err, "Failed to install Multi-cluster flows for Pod", "podIP", key.podIP, "nodeIP", key.podNodeIP) + return err + } + } + return nil +} diff --git a/pkg/agent/multicluster/pod_route_controller_test.go b/pkg/agent/multicluster/pod_route_controller_test.go new file mode 100644 index 00000000000..43feb567acb --- /dev/null +++ b/pkg/agent/multicluster/pod_route_controller_test.go @@ -0,0 +1,270 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicluster + +import ( + "context" + "net" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcfake "antrea.io/antrea/multicluster/pkg/client/clientset/versioned/fake" + mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + oftest "antrea.io/antrea/pkg/agent/openflow/testing" + ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" +) + +var ( + defaultNs = "default" + node1Name = "node-1" + ctx = context.TODO() + + nginx1NoIPs = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, + Name: "nginx1", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + } + + nginx2WithIPs = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, + Name: "nginx2", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + Status: corev1.PodStatus{ + PodIP: "192.168.1.12", + HostIP: "10.170.10.11", + }, + } + + nginx2PodIP = net.ParseIP("192.168.1.12") + nginx2HostIP = net.ParseIP("10.170.10.11") + + nginx3WithIPs = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, + Name: "nginx3", + }, + Spec: corev1.PodSpec{ + NodeName: node1Name, + }, + Status: corev1.PodStatus{ + PodIP: "192.168.10.13", + HostIP: "10.170.10.13", + }, + } +) + +type fakeMCPodRouteController struct { + *MCPodRouteController + mcClient *mcfake.Clientset + k8sClient *k8sfake.Clientset + informerFactory informers.SharedInformerFactory + mcInformerFactory mcinformers.SharedInformerFactory + ofClient *oftest.MockClient + ovsClient *ovsconfigtest.MockOVSBridgeClient + interfaceStore interfacestore.InterfaceStore +} + +func newMCPodRouteController(t *testing.T, nodeConfig *config.NodeConfig, mcClient *mcfake.Clientset, + k8sClient *k8sfake.Clientset) (*fakeMCPodRouteController, func()) { + mcInformerFactory := mcinformers.NewSharedInformerFactoryWithOptions(mcClient, + 0, + mcinformers.WithTweakListOptions(func(lo *metav1.ListOptions) { + lo.FieldSelector = fields.Set{"metadata.namespace": defaultNs}.String() + }), + ) + gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways() + + informerFactory := informers.NewSharedInformerFactory(k8sClient, 0) + podInformer := informerFactory.Core().V1().Pods() + + ctrl := gomock.NewController(t) + ofClient := oftest.NewMockClient(ctrl) + ovsClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl) + interfaceStore := interfacestore.NewInterfaceStore() + c := NewMCPodRouteController( + mcClient, + podInformer.Informer(), + gwInformer.Informer(), + ofClient, + ovsClient, + interfaceStore, + nodeConfig, + defaultNs, + ) + return &fakeMCPodRouteController{ + MCPodRouteController: c, + mcClient: mcClient, + k8sClient: k8sClient, + mcInformerFactory: mcInformerFactory, + informerFactory: informerFactory, + ofClient: ofClient, + ovsClient: ovsClient, + interfaceStore: interfaceStore, + }, ctrl.Finish +} + +func TestInitialize(t *testing.T) { + mcClient := mcfake.NewSimpleClientset() + k8sClient := k8sfake.NewSimpleClientset() + c, closeFn := newMCPodRouteController(t, &config.NodeConfig{Name: node1Name}, mcClient, k8sClient) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + + mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(context.TODO(), &gateway1, metav1.CreateOptions{}) + if err := waitForGatewayRealized(mcClient, &gateway1); err != nil { + t.Errorf("error when waiting for Gateway '%s/%s' to be realized: %v", gateway1.Namespace, gateway1.Name, err) + } + c.initialize() + assert.Equal(t, gateway1.Name, c.activeGWName) +} + +func waitForGatewayRealized(clientset *mcfake.Clientset, gateway *mcv1alpha1.Gateway) error { + return wait.Poll(time.Millisecond, 1*time.Second, func() (bool, error) { + _, err := clientset.MulticlusterV1alpha1().Gateways(gateway.Namespace).Get(context.TODO(), gateway.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + return true, err + }) +} + +func TestGatewayEvent(t *testing.T) { + mcClient := mcfake.NewSimpleClientset() + k8sClient := k8sfake.NewSimpleClientset([]runtime.Object{nginx1NoIPs, nginx2WithIPs, nginx3WithIPs}...) + c, closeFn := newMCPodRouteController(t, &config.NodeConfig{Name: node1Name}, mcClient, k8sClient) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + + // Create a Gateway node-2 in the default Namespace + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(ctx, &gateway2, metav1.CreateOptions{}) + // Delete a Gateway node-2 in the default Namespace + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Delete(ctx, gateway2.Name, metav1.DeleteOptions{}) + + // Create a Gateway node-3 in the kube-system Namespace + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.Namespace).Create(ctx, &gateway3, metav1.CreateOptions{}) + // Delete a Gateway node-3 in the kube-system Namespace + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.Namespace).Delete(ctx, gateway3.Name, metav1.DeleteOptions{}) + + // Create a Gateway node-1 + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(ctx, &gateway1, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterPodFlows(nginx2PodIP, nginx2HostIP) + c.processNextWorkItem() + + // Delete a Gateway node-1 + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Delete(ctx, gateway1.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(nginx2PodIP.String(), true) + c.processNextWorkItem() + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} + +func TestPodEvent(t *testing.T) { + mcClient := mcfake.NewSimpleClientset() + k8sClient := k8sfake.NewSimpleClientset([]runtime.Object{nginx2WithIPs}...) + c, closeFn := newMCPodRouteController(t, &config.NodeConfig{Name: node1Name}, mcClient, k8sClient) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + // Create a Gateway in kube-system Namespace + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.Namespace).Create(ctx, &gateway3, metav1.CreateOptions{}) + + // Create a Gateway + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(ctx, &gateway1, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterPodFlows(nginx2PodIP, nginx2HostIP) + c.processNextWorkItem() + + // Create a Pod without IPs + c.k8sClient.CoreV1().Pods(defaultNs).Create(ctx, nginx1NoIPs, metav1.CreateOptions{}) + + // Create a Pod in the Gateway Node + c.k8sClient.CoreV1().Pods(defaultNs).Create(ctx, nginx3WithIPs, metav1.CreateOptions{}) + + // Update a Pod with IPs + nginx1Updated := nginx1NoIPs.DeepCopy() + nginx1Updated.Status.PodIP = "192.168.10.11" + nginx1Updated.Status.HostIP = "172.16.10.11" + c.k8sClient.CoreV1().Pods(defaultNs).Update(ctx, nginx1Updated, metav1.UpdateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterPodFlows(net.ParseIP("192.168.10.11"), net.ParseIP("172.16.10.11")) + c.processNextWorkItem() + + // Update a Pod's label + nginx1UpdatedLabel := nginx1Updated.DeepCopy() + nginx1UpdatedLabel.Labels = map[string]string{"env": "test"} + c.k8sClient.CoreV1().Pods(defaultNs).Update(ctx, nginx1Updated, metav1.UpdateOptions{}) + + // Delete a Pod + c.k8sClient.CoreV1().Pods(defaultNs).Delete(ctx, nginx1Updated.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows("192.168.10.11", true) + c.processNextWorkItem() + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 36c57e467a6..4b957a1a8f4 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -340,9 +340,13 @@ type Client interface { // InstallMulticlusterClassifierFlows installs flows to classify cross-cluster packets. InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGateway bool) error - // UninstallMulticlusterFlows removes cross-cluster flows matching the given clusterID on + // InstallMulticlusterPodFlows installs flows to handle cross-cluster packets from a Gateway to + // local general Nodes. + InstallMulticlusterPodFlows(podIP net.IP, tunnelPeerIP net.IP) error + + // UninstallMulticlusterFlows removes cross-cluster flows matching the given cache key on // a regular Node or a Gateway. - UninstallMulticlusterFlows(clusterID string) error + UninstallMulticlusterFlows(key string, isPodFlow bool) error // InstallVMUplinkFlows installs flows to forward packet between uplinkPort and hostPort. On a VM, the // uplink and host internal port are paired directly, and no layer 2/3 forwarding flow is installed. @@ -1387,6 +1391,7 @@ func (c *client) InstallMulticlusterGatewayFlows(clusterID string, // InstallMulticlusterClassifierFlows adds the following flows: // - One flow in L2ForwardingCalcTable for the global virtual multicluster MAC 'aa:bb:cc:dd:ee:f0' // to set its target output port as 'antrea-tun0'. This flow will be on both Gateway and regular Node. +// - One flow in ClassifierTable for the tunnel traffic if it's networkPolicyOnly mode. // - One flow to match MC virtual MAC 'aa:bb:cc:dd:ee:f0' in ClassifierTable for Gateway only. // - One flow in L2ForwardingOutTable to allow multicluster hairpin traffic for Gateway only. func (c *client) InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGateway bool) error { @@ -1397,6 +1402,10 @@ func (c *client) InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGatew c.featurePodConnectivity.l2ForwardCalcFlow(GlobalVirtualMACForMulticluster, tunnelOFPort), } + if c.networkConfig.TrafficEncapMode != config.TrafficEncapModeEncap { + flows = append(flows, c.featurePodConnectivity.tunnelClassifierFlow(tunnelOFPort)) + } + if isGateway { flows = append(flows, c.featureMulticluster.tunnelClassifierFlow(tunnelOFPort), @@ -1406,9 +1415,21 @@ func (c *client) InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGatew return c.modifyFlows(c.featureMulticluster.cachedFlows, "multicluster-classifier", flows) } -func (c *client) UninstallMulticlusterFlows(clusterID string) error { +func (c *client) InstallMulticlusterPodFlows(podIP net.IP, tunnelPeerIP net.IP) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - cacheKey := fmt.Sprintf("cluster_%s", clusterID) + cacheKey := fmt.Sprintf("pod_%s", podIP.String()) + localGatewayMAC := c.nodeConfig.GatewayConfig.MAC + flows := []binding.Flow{c.featureMulticluster.l3FwdFlowToLocalViaTun(localGatewayMAC, podIP, tunnelPeerIP)} + return c.modifyFlows(c.featureMulticluster.cachedFlows, cacheKey, flows) +} + +func (c *client) UninstallMulticlusterFlows(key string, isPodFlow bool) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + cacheKey := fmt.Sprintf("cluster_%s", key) + if isPodFlow { + cacheKey = fmt.Sprintf("pod_%s", key) + } return c.deleteFlows(c.featureMulticluster.cachedFlows, cacheKey) } diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index fc75cdf8b2d..8d45fac9def 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2050,7 +2050,7 @@ func Test_client_InstallMulticlusterNodeFlows(t *testing.T) { require.True(t, ok) assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI)) - assert.NoError(t, fc.UninstallMulticlusterFlows(clusterID)) + assert.NoError(t, fc.UninstallMulticlusterFlows(clusterID, false)) _, ok = fc.featureMulticluster.cachedFlows.Load(cacheKey) require.False(t, ok) }) @@ -2105,7 +2105,7 @@ func Test_client_InstallMulticlusterGatewayFlows(t *testing.T) { require.True(t, ok) assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI)) - assert.NoError(t, fc.UninstallMulticlusterFlows(clusterID)) + assert.NoError(t, fc.UninstallMulticlusterFlows(clusterID, false)) _, ok = fc.featureMulticluster.cachedFlows.Load(cacheKey) require.False(t, ok) }) diff --git a/pkg/agent/openflow/multicluster.go b/pkg/agent/openflow/multicluster.go index 7761836ef43..c89a43c04c6 100644 --- a/pkg/agent/openflow/multicluster.go +++ b/pkg/agent/openflow/multicluster.go @@ -184,3 +184,22 @@ func (f *featureMulticluster) snatConntrackFlows(serviceCIDR net.IPNet, localGat ) return flows } + +func (f *featureMulticluster) l3FwdFlowToLocalViaTun( + localGatewayMAC net.HardwareAddr, + podIP net.IP, + tunnelPeer net.IP) binding.Flow { + ipProtocol := getIPProtocol(podIP) + // This generates the flow to forward cross-cluster request packets based + // on Pod IP. + return L3ForwardingTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(ipProtocol). + MatchDstIP(podIP). + MatchDstMAC(GlobalVirtualMACForMulticluster). + Action().SetSrcMAC(localGatewayMAC). // Rewrite src MAC to local gateway MAC. + Action().SetTunnelDst(tunnelPeer). // Flow based tunnel. Set tunnel destination. + Action().LoadRegMark(ToTunnelRegMark). + Action().GotoTable(L3DecTTLTable.GetID()). + Done() +} diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 596c9c6daeb..ad77e725780 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -367,6 +367,20 @@ func (mr *MockClientMockRecorder) InstallMulticlusterNodeFlows(arg0, arg1, arg2, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticlusterNodeFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticlusterNodeFlows), arg0, arg1, arg2, arg3) } +// InstallMulticlusterPodFlows mocks base method +func (m *MockClient) InstallMulticlusterPodFlows(arg0, arg1 net.IP) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticlusterPodFlows", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticlusterPodFlows indicates an expected call of InstallMulticlusterPodFlows +func (mr *MockClientMockRecorder) InstallMulticlusterPodFlows(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticlusterPodFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticlusterPodFlows), arg0, arg1) +} + // InstallNodeFlows mocks base method func (m *MockClient) InstallNodeFlows(arg0 string, arg1 map[*net.IPNet]net.IP, arg2 *ip.DualStackIPs, arg3 uint32, arg4 net.HardwareAddr) error { m.ctrl.T.Helper() @@ -824,17 +838,17 @@ func (mr *MockClientMockRecorder) UninstallMulticastGroup(arg0 interface{}) *gom } // UninstallMulticlusterFlows mocks base method -func (m *MockClient) UninstallMulticlusterFlows(arg0 string) error { +func (m *MockClient) UninstallMulticlusterFlows(arg0 string, arg1 bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UninstallMulticlusterFlows", arg0) + ret := m.ctrl.Call(m, "UninstallMulticlusterFlows", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // UninstallMulticlusterFlows indicates an expected call of UninstallMulticlusterFlows -func (mr *MockClientMockRecorder) UninstallMulticlusterFlows(arg0 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) UninstallMulticlusterFlows(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallMulticlusterFlows", reflect.TypeOf((*MockClient)(nil).UninstallMulticlusterFlows), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallMulticlusterFlows", reflect.TypeOf((*MockClient)(nil).UninstallMulticlusterFlows), arg0, arg1) } // UninstallNodeFlows mocks base method diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 88d1c6340b6..e7ab8c20378 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -573,7 +573,7 @@ func newTester() *cmdAddDelTester { testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, - false, false, false, false, + false, false, false, false, 0, tester.networkReadyCh) tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) ctx := context.Background() @@ -737,7 +737,7 @@ func setupChainTest( testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, - true, false, false, false, + true, false, false, false, 0, networkReadyCh) } else { server = inServer