From 8427b4ad44dba7fa665b30154607ecd1b117c14f Mon Sep 17 00:00:00 2001 From: Lan Date: Sat, 19 Nov 2022 06:37:14 +0800 Subject: [PATCH] Update Openflow rules when Multi-cluster Gateway updates (#4388) When GatewayIP or InternalIP of active Gateway changes, the corresponding Openflow rules are not updated until any other event triggers the flow sync process. Fix the issue by comparing installed active Gateway's InternalIP and GatewayIP with the current active Gateway. And add more unit tests to cover Gateway update events. Signed-off-by: Lan Luo --- pkg/agent/multicluster/mc_route_controller.go | 79 ++++++++++++------- .../multicluster/mc_route_controller_test.go | 38 ++++++++- 2 files changed, 85 insertions(+), 32 deletions(-) diff --git a/pkg/agent/multicluster/mc_route_controller.go b/pkg/agent/multicluster/mc_route_controller.go index 4319733b282..6c5966e9bc4 100644 --- a/pkg/agent/multicluster/mc_route_controller.go +++ b/pkg/agent/multicluster/mc_route_controller.go @@ -15,10 +15,8 @@ package noderoute import ( - "errors" "fmt" "net" - "strings" "time" "k8s.io/apimachinery/pkg/labels" @@ -72,10 +70,10 @@ type MCRouteController struct { // in MCRouteController. Need to use mutex to protect 'installedCIImports' if // we change the number of 'defaultWorkers'. installedCIImports map[string]*mcv1alpha1.ClusterInfoImport - // Need to use mutex to protect 'installedActiveGWName' if we change + // Need to use mutex to protect 'installedActiveGW' if we change // the number of 'defaultWorkers' to run multiple go routines to handle // events. - installedActiveGWName string + installedActiveGW *mcv1alpha1.Gateway // The Namespace where Antrea Multi-cluster Controller is running. namespace string } @@ -151,6 +149,11 @@ func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) { return } } + + if gw.Namespace != c.namespace { + return + } + if !isDelete { if net.ParseIP(gw.InternalIP) == nil || net.ParseIP(gw.GatewayIP) == nil { klog.ErrorS(nil, "No valid Internal IP or Gateway IP is found in Gateway", "gateway", gw.Namespace+"/"+gw.Name) @@ -175,6 +178,10 @@ func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete b } } + if ciImp.Namespace != c.namespace { + return + } + if !isDelete { if len(ciImp.Spec.GatewayInfos) == 0 { klog.ErrorS(nil, "Received invalid ClusterInfoImport", "object", obj) @@ -243,34 +250,35 @@ func (c *MCRouteController) syncMCFlows() error { if err != nil { return err } - if activeGW == nil && c.installedActiveGWName == "" { + if activeGW == nil && c.installedActiveGW == nil { klog.V(2).InfoS("No active Gateway is found") return nil } - klog.V(2).InfoS("Installed Gateway", "gateway", c.installedActiveGWName) - if activeGW != nil && activeGW.Name == c.installedActiveGWName { - // Active Gateway name doesn't change but do a full flows resync + klog.V(2).InfoS("Installed Gateway", "gateway", klog.KObj(c.installedActiveGW)) + if activeGW != nil && c.installedActiveGW != nil && activeGW.Name == c.installedActiveGW.Name { + // Active Gateway name doesn't change but still do a full flow sync // for any Gateway Spec or ClusterInfoImport changes. if err := c.syncMCFlowsForAllCIImps(activeGW); err != nil { return err } + c.installedActiveGW = activeGW return nil } - if c.installedActiveGWName != "" { + if c.installedActiveGW != nil { if err := c.deleteMCFlowsForAllCIImps(); err != nil { return err } - klog.V(2).InfoS("Deleted flows for installed Gateway", "gateway", c.installedActiveGWName) - c.installedActiveGWName = "" + klog.V(2).InfoS("Deleted flows for installed Gateway", "gateway", klog.KObj(c.installedActiveGW)) + c.installedActiveGW = nil } if activeGW != nil { if err := c.ofClient.InstallMulticlusterClassifierFlows(config.DefaultTunOFPort, activeGW.Name == c.nodeConfig.Name); err != nil { return err } - c.installedActiveGWName = activeGW.Name + c.installedActiveGW = activeGW return c.addMCFlowsForAllCIImps(activeGW) } return nil @@ -282,15 +290,13 @@ func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway return err } + activeGWChanged := c.checkGateWayIPChange(activeGW) installedCIImportNames := sets.StringKeySet(c.installedCIImports) - for idx := range desiredCIImports { - if err = c.addMCFlowsForSingleCIImp(activeGW, desiredCIImports[idx], c.installedCIImports[desiredCIImports[idx].Name]); err != nil { - if strings.Contains(err.Error(), "invalid Gateway IP") { - continue - } + for _, ciImp := range desiredCIImports { + if err = c.addMCFlowsForSingleCIImp(activeGW, ciImp, c.installedCIImports[ciImp.Name], activeGWChanged); err != nil { return err } - installedCIImportNames.Delete(desiredCIImports[idx].Name) + installedCIImportNames.Delete(ciImp.Name) } for name := range installedCIImportNames { @@ -301,6 +307,18 @@ func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway return nil } +func (c *MCRouteController) checkGateWayIPChange(activeGW *mcv1alpha1.Gateway) bool { + var activeGWChanged bool + if activeGW.Name == c.nodeConfig.Name { + // On a Gateway Node, the GatewayIP of the active Gateway will impact the Openflow rules. + activeGWChanged = activeGW.GatewayIP != c.installedActiveGW.GatewayIP + } else { + // On a regular Node, the InternalIP of the active Gateway will impact the Openflow rules. + activeGWChanged = activeGW.InternalIP != c.installedActiveGW.InternalIP + } + return activeGWChanged +} + func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error { allCIImports, err := c.ciImportLister.ClusterInfoImports(c.namespace).List(labels.Everything()) if err != nil { @@ -310,12 +328,8 @@ func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) klog.V(2).InfoS("No remote ClusterInfo imported, do nothing") return nil } - for _, ciImport := range allCIImports { - if err := c.addMCFlowsForSingleCIImp(activeGW, ciImport, nil); err != nil { - if strings.Contains(err.Error(), "invalid Gateway IP") { - continue - } + if err := c.addMCFlowsForSingleCIImp(activeGW, ciImport, nil, true); err != nil { return err } } @@ -323,19 +337,24 @@ func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) return nil } -func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport, installedCIImp *mcv1alpha1.ClusterInfoImport) error { +func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport, + installedCIImp *mcv1alpha1.ClusterInfoImport, activeGWChanged bool) error { tunnelPeerIPToRemoteGW := getPeerGatewayIP(ciImport.Spec) if tunnelPeerIPToRemoteGW == nil { - return errors.New("invalid Gateway IP") + klog.ErrorS(nil, "The ClusterInfoImport has no valid Gateway IP, skip it", "clusterinfoimport", klog.KObj(ciImport)) + return nil } + var ciImportNoChange bool if installedCIImp != nil { oldTunnelPeerIPToRemoteGW := getPeerGatewayIP(installedCIImp.Spec) - if oldTunnelPeerIPToRemoteGW.Equal(tunnelPeerIPToRemoteGW) && installedCIImp.Spec.ServiceCIDR == ciImport.Spec.ServiceCIDR && - sets.NewString(installedCIImp.Spec.PodCIDRs...).Equal(sets.NewString(ciImport.Spec.PodCIDRs...)) { - klog.V(2).InfoS("No difference between new and installed ClusterInfoImports, skip updating", "clusterinfoimport", ciImport.Name) - return nil - } + ciImportNoChange = oldTunnelPeerIPToRemoteGW.Equal(tunnelPeerIPToRemoteGW) && installedCIImp.Spec.ServiceCIDR == ciImport.Spec.ServiceCIDR && + sets.NewString(installedCIImp.Spec.PodCIDRs...).Equal(sets.NewString(ciImport.Spec.PodCIDRs...)) + } + + if ciImportNoChange && !activeGWChanged { + klog.V(2).InfoS("ClusterInfoImport and the active Gateway have no change, skip updating", "clusterinfoimport", klog.KObj(ciImport), "gateway", klog.KObj(activeGW)) + return nil } klog.InfoS("Adding/updating remote Gateway Node flows for Multi-cluster", "gateway", klog.KObj(activeGW), diff --git a/pkg/agent/multicluster/mc_route_controller_test.go b/pkg/agent/multicluster/mc_route_controller_test.go index ba0d645faa0..0f582fbf6a6 100644 --- a/pkg/agent/multicluster/mc_route_controller_test.go +++ b/pkg/agent/multicluster/mc_route_controller_test.go @@ -177,6 +177,23 @@ func TestMCRouteControllerAsGateway(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1) c.processNextWorkItem() + // Update Gateway1's GatewayIP + updatedGateway1a := gateway1.DeepCopy() + updatedGateway1a.GatewayIP = "10.16.0.100" + updatedGateway1aIP := net.ParseIP("10.16.0.100") + c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1a.GetNamespace()).Update(context.TODO(), + updatedGateway1a, metav1.UpdateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, + gomock.Any(), peerNodeIP1, updatedGateway1aIP).Times(1) + c.processNextWorkItem() + + // Update Gateway1's InternalIP + updatedGateway1b := updatedGateway1a.DeepCopy() + updatedGateway1b.InternalIP = "17.162.0.10" + c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1b.GetNamespace()).Update(context.TODO(), + updatedGateway1b, metav1.UpdateOptions{}) + c.processNextWorkItem() + // Create Gateway2 as active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) @@ -191,7 +208,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).Times(1) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1, gw1GatewayIP).Times(1) + gomock.Any(), peerNodeIP1, updatedGateway1aIP).Times(1) c.processNextWorkItem() // Delete last Gateway @@ -256,6 +273,23 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1) c.processNextWorkItem() + // Update Gateway1's GatewayIP + updatedGateway1a := gateway1.DeepCopy() + updatedGateway1a.GatewayIP = "10.16.0.100" + c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1a.GetNamespace()).Update(context.TODO(), + updatedGateway1a, metav1.UpdateOptions{}) + c.processNextWorkItem() + + // Update Gateway1's InternalIP + updatedGateway1b := updatedGateway1a.DeepCopy() + updatedGateway1b.InternalIP = "17.162.0.10" + updatedGateway1bIP := net.ParseIP("17.162.0.10") + c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1b.GetNamespace()).Update(context.TODO(), + updatedGateway1b, metav1.UpdateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, + gomock.Any(), updatedGateway1bIP).Times(1) + c.processNextWorkItem() + // Create Gateway2 as the active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) @@ -270,7 +304,7 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1).Times(1) + gomock.Any(), updatedGateway1bIP).Times(1) c.processNextWorkItem() // Delete last Gateway