Skip to content

Commit

Permalink
Update Openflow rules when Multi-cluster Gateway updates (#4388)
Browse files Browse the repository at this point in the history
When GatewayIP or InternalIP of active Gateway changes, the
corresponding Openflow rules are not updated until any other
event triggers the flow sync process.

Fix the issue by comparing installed active Gateway's InternalIP and
GatewayIP with the current active Gateway. And add more unit tests
to cover Gateway update events.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone authored Nov 18, 2022
1 parent fcda658 commit 8427b4a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 32 deletions.
79 changes: 49 additions & 30 deletions pkg/agent/multicluster/mc_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
package noderoute

import (
"errors"
"fmt"
"net"
"strings"
"time"

"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -72,10 +70,10 @@ type MCRouteController struct {
// in MCRouteController. Need to use mutex to protect 'installedCIImports' if
// we change the number of 'defaultWorkers'.
installedCIImports map[string]*mcv1alpha1.ClusterInfoImport
// Need to use mutex to protect 'installedActiveGWName' if we change
// Need to use mutex to protect 'installedActiveGW' if we change
// the number of 'defaultWorkers' to run multiple go routines to handle
// events.
installedActiveGWName string
installedActiveGW *mcv1alpha1.Gateway
// The Namespace where Antrea Multi-cluster Controller is running.
namespace string
}
Expand Down Expand Up @@ -151,6 +149,11 @@ func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) {
return
}
}

if gw.Namespace != c.namespace {
return
}

if !isDelete {
if net.ParseIP(gw.InternalIP) == nil || net.ParseIP(gw.GatewayIP) == nil {
klog.ErrorS(nil, "No valid Internal IP or Gateway IP is found in Gateway", "gateway", gw.Namespace+"/"+gw.Name)
Expand All @@ -175,6 +178,10 @@ func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete b
}
}

if ciImp.Namespace != c.namespace {
return
}

if !isDelete {
if len(ciImp.Spec.GatewayInfos) == 0 {
klog.ErrorS(nil, "Received invalid ClusterInfoImport", "object", obj)
Expand Down Expand Up @@ -243,34 +250,35 @@ func (c *MCRouteController) syncMCFlows() error {
if err != nil {
return err
}
if activeGW == nil && c.installedActiveGWName == "" {
if activeGW == nil && c.installedActiveGW == nil {
klog.V(2).InfoS("No active Gateway is found")
return nil
}

klog.V(2).InfoS("Installed Gateway", "gateway", c.installedActiveGWName)
if activeGW != nil && activeGW.Name == c.installedActiveGWName {
// Active Gateway name doesn't change but do a full flows resync
klog.V(2).InfoS("Installed Gateway", "gateway", klog.KObj(c.installedActiveGW))
if activeGW != nil && c.installedActiveGW != nil && activeGW.Name == c.installedActiveGW.Name {
// Active Gateway name doesn't change but still do a full flow sync
// for any Gateway Spec or ClusterInfoImport changes.
if err := c.syncMCFlowsForAllCIImps(activeGW); err != nil {
return err
}
c.installedActiveGW = activeGW
return nil
}

if c.installedActiveGWName != "" {
if c.installedActiveGW != nil {
if err := c.deleteMCFlowsForAllCIImps(); err != nil {
return err
}
klog.V(2).InfoS("Deleted flows for installed Gateway", "gateway", c.installedActiveGWName)
c.installedActiveGWName = ""
klog.V(2).InfoS("Deleted flows for installed Gateway", "gateway", klog.KObj(c.installedActiveGW))
c.installedActiveGW = nil
}

if activeGW != nil {
if err := c.ofClient.InstallMulticlusterClassifierFlows(config.DefaultTunOFPort, activeGW.Name == c.nodeConfig.Name); err != nil {
return err
}
c.installedActiveGWName = activeGW.Name
c.installedActiveGW = activeGW
return c.addMCFlowsForAllCIImps(activeGW)
}
return nil
Expand All @@ -282,15 +290,13 @@ func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway
return err
}

activeGWChanged := c.checkGateWayIPChange(activeGW)
installedCIImportNames := sets.StringKeySet(c.installedCIImports)
for idx := range desiredCIImports {
if err = c.addMCFlowsForSingleCIImp(activeGW, desiredCIImports[idx], c.installedCIImports[desiredCIImports[idx].Name]); err != nil {
if strings.Contains(err.Error(), "invalid Gateway IP") {
continue
}
for _, ciImp := range desiredCIImports {
if err = c.addMCFlowsForSingleCIImp(activeGW, ciImp, c.installedCIImports[ciImp.Name], activeGWChanged); err != nil {
return err
}
installedCIImportNames.Delete(desiredCIImports[idx].Name)
installedCIImportNames.Delete(ciImp.Name)
}

for name := range installedCIImportNames {
Expand All @@ -301,6 +307,18 @@ func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway
return nil
}

func (c *MCRouteController) checkGateWayIPChange(activeGW *mcv1alpha1.Gateway) bool {
var activeGWChanged bool
if activeGW.Name == c.nodeConfig.Name {
// On a Gateway Node, the GatewayIP of the active Gateway will impact the Openflow rules.
activeGWChanged = activeGW.GatewayIP != c.installedActiveGW.GatewayIP
} else {
// On a regular Node, the InternalIP of the active Gateway will impact the Openflow rules.
activeGWChanged = activeGW.InternalIP != c.installedActiveGW.InternalIP
}
return activeGWChanged
}

func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error {
allCIImports, err := c.ciImportLister.ClusterInfoImports(c.namespace).List(labels.Everything())
if err != nil {
Expand All @@ -310,32 +328,33 @@ func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway)
klog.V(2).InfoS("No remote ClusterInfo imported, do nothing")
return nil
}

for _, ciImport := range allCIImports {
if err := c.addMCFlowsForSingleCIImp(activeGW, ciImport, nil); err != nil {
if strings.Contains(err.Error(), "invalid Gateway IP") {
continue
}
if err := c.addMCFlowsForSingleCIImp(activeGW, ciImport, nil, true); err != nil {
return err
}
}

return nil
}

func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport, installedCIImp *mcv1alpha1.ClusterInfoImport) error {
func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport,
installedCIImp *mcv1alpha1.ClusterInfoImport, activeGWChanged bool) error {
tunnelPeerIPToRemoteGW := getPeerGatewayIP(ciImport.Spec)
if tunnelPeerIPToRemoteGW == nil {
return errors.New("invalid Gateway IP")
klog.ErrorS(nil, "The ClusterInfoImport has no valid Gateway IP, skip it", "clusterinfoimport", klog.KObj(ciImport))
return nil
}

var ciImportNoChange bool
if installedCIImp != nil {
oldTunnelPeerIPToRemoteGW := getPeerGatewayIP(installedCIImp.Spec)
if oldTunnelPeerIPToRemoteGW.Equal(tunnelPeerIPToRemoteGW) && installedCIImp.Spec.ServiceCIDR == ciImport.Spec.ServiceCIDR &&
sets.NewString(installedCIImp.Spec.PodCIDRs...).Equal(sets.NewString(ciImport.Spec.PodCIDRs...)) {
klog.V(2).InfoS("No difference between new and installed ClusterInfoImports, skip updating", "clusterinfoimport", ciImport.Name)
return nil
}
ciImportNoChange = oldTunnelPeerIPToRemoteGW.Equal(tunnelPeerIPToRemoteGW) && installedCIImp.Spec.ServiceCIDR == ciImport.Spec.ServiceCIDR &&
sets.NewString(installedCIImp.Spec.PodCIDRs...).Equal(sets.NewString(ciImport.Spec.PodCIDRs...))
}

if ciImportNoChange && !activeGWChanged {
klog.V(2).InfoS("ClusterInfoImport and the active Gateway have no change, skip updating", "clusterinfoimport", klog.KObj(ciImport), "gateway", klog.KObj(activeGW))
return nil
}

klog.InfoS("Adding/updating remote Gateway Node flows for Multi-cluster", "gateway", klog.KObj(activeGW),
Expand Down
38 changes: 36 additions & 2 deletions pkg/agent/multicluster/mc_route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,23 @@ func TestMCRouteControllerAsGateway(t *testing.T) {
c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1)
c.processNextWorkItem()

// Update Gateway1's GatewayIP
updatedGateway1a := gateway1.DeepCopy()
updatedGateway1a.GatewayIP = "10.16.0.100"
updatedGateway1aIP := net.ParseIP("10.16.0.100")
c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1a.GetNamespace()).Update(context.TODO(),
updatedGateway1a, metav1.UpdateOptions{})
c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name,
gomock.Any(), peerNodeIP1, updatedGateway1aIP).Times(1)
c.processNextWorkItem()

// Update Gateway1's InternalIP
updatedGateway1b := updatedGateway1a.DeepCopy()
updatedGateway1b.InternalIP = "17.162.0.10"
c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1b.GetNamespace()).Update(context.TODO(),
updatedGateway1b, metav1.UpdateOptions{})
c.processNextWorkItem()

// Create Gateway2 as active Gateway
c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(),
&gateway2, metav1.CreateOptions{})
Expand All @@ -191,7 +208,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) {
c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1)
c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).Times(1)
c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name,
gomock.Any(), peerNodeIP1, gw1GatewayIP).Times(1)
gomock.Any(), peerNodeIP1, updatedGateway1aIP).Times(1)
c.processNextWorkItem()

// Delete last Gateway
Expand Down Expand Up @@ -256,6 +273,23 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) {
c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1)
c.processNextWorkItem()

// Update Gateway1's GatewayIP
updatedGateway1a := gateway1.DeepCopy()
updatedGateway1a.GatewayIP = "10.16.0.100"
c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1a.GetNamespace()).Update(context.TODO(),
updatedGateway1a, metav1.UpdateOptions{})
c.processNextWorkItem()

// Update Gateway1's InternalIP
updatedGateway1b := updatedGateway1a.DeepCopy()
updatedGateway1b.InternalIP = "17.162.0.10"
updatedGateway1bIP := net.ParseIP("17.162.0.10")
c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1b.GetNamespace()).Update(context.TODO(),
updatedGateway1b, metav1.UpdateOptions{})
c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name,
gomock.Any(), updatedGateway1bIP).Times(1)
c.processNextWorkItem()

// Create Gateway2 as the active Gateway
c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(),
&gateway2, metav1.CreateOptions{})
Expand All @@ -270,7 +304,7 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) {
c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1)
c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1)
c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name,
gomock.Any(), peerNodeIP1).Times(1)
gomock.Any(), updatedGateway1bIP).Times(1)
c.processNextWorkItem()

// Delete last Gateway
Expand Down

0 comments on commit 8427b4a

Please sign in to comment.