Skip to content

Commit

Permalink
Multi-cluster support with networkPolicyOnly mode
Browse files Browse the repository at this point in the history
In order to support multi-cluster traffic when the member cluster is
deployed with networkPolicyOnly mode, antrea-agent will be responsible to do
following things:

1. Create tunnel interface `antrea-tun0` for cross-cluster traffic
2. Watch all Pods on the Gateway and set up one rule per Pod in L3Fowarding
table as long as the Pod is running in a general Node instead of the Gateway.
3. Update container interface's MTU to minus tunnel overload.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Nov 23, 2022
1 parent 8427b4a commit ac9547d
Show file tree
Hide file tree
Showing 16 changed files with 787 additions and 92 deletions.
35 changes: 27 additions & 8 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func run(o *Options) error {
serviceInformer := informerFactory.Core().V1().Services()
endpointsInformer := informerFactory.Core().V1().Endpoints()
namespaceInformer := informerFactory.Core().V1().Namespaces()
podInformer := informerFactory.Core().V1().Pods()

// Create Antrea Clientset for the given config.
antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient)
Expand All @@ -134,6 +135,7 @@ func run(o *Options) error {
enableBridgingMode := enableAntreaIPAM && o.config.EnableBridgingMode
// Bridging mode will connect the uplink interface to the OVS bridge.
connectUplinkToBridge := enableBridgingMode
multiclusterEnabled := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.Enable

ovsDatapathType := ovsconfig.OVSDatapathType(o.config.OVSDatapathType)
ovsBridgeClient := ovsconfig.NewOVSBridge(o.config.OVSBridge, ovsDatapathType, ovsdbConnection)
Expand All @@ -148,7 +150,7 @@ func run(o *Options) error {
connectUplinkToBridge,
multicastEnabled,
features.DefaultFeatureGate.Enabled(features.TrafficControl),
features.DefaultFeatureGate.Enabled(features.Multicluster),
multiclusterEnabled,
)

var serviceCIDRNet *net.IPNet
Expand Down Expand Up @@ -248,7 +250,8 @@ func run(o *Options) error {
o.config.ExternalNode.ExternalNodeNamespace,
features.DefaultFeatureGate.Enabled(features.AntreaProxy),
o.config.AntreaProxy.ProxyAll,
connectUplinkToBridge)
connectUplinkToBridge,
multiclusterEnabled)
err = agentInitializer.Initialize()
if err != nil {
return fmt.Errorf("error initializing agent: %v", err)
Expand Down Expand Up @@ -281,8 +284,10 @@ func run(o *Options) error {

var mcRouteController *mcroute.MCRouteController
var mcInformerFactory mcinformers.SharedInformerFactory
var mcPolicyOnlyRouteController *mcroute.MCWithPolicyOnlyNodeRouteController

if features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.Enable {
isNetworkPolicyOnly := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly()
if multiclusterEnabled {
mcNamespace := env.GetPodNamespace()
if o.config.Multicluster.Namespace != "" {
mcNamespace = o.config.Multicluster.Namespace
Expand All @@ -300,6 +305,19 @@ func run(o *Options) error {
nodeConfig,
mcNamespace,
)

if isNetworkPolicyOnly {
mcPolicyOnlyRouteController = mcroute.NewMCWithPolicyOnlyNodeRouteController(
mcClient,
podInformer.Informer(),
gwInformer.Informer(),
ofClient,
ovsBridgeClient,
ifaceStore,
nodeConfig,
mcNamespace,
)
}
}
var groupCounters []proxytypes.GroupCounter
groupIDUpdates := make(chan string, 100)
Expand Down Expand Up @@ -441,10 +459,7 @@ func run(o *Options) error {
var externalNodeController *externalnode.ExternalNodeController
var localExternalNodeInformer cache.SharedIndexInformer
if o.nodeType == config.K8sNode {
isChaining := false
if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
isChaining = true
}
isChaining := isNetworkPolicyOnly
cniServer = cniserver.New(
o.config.CNISocket,
o.config.HostProcPathPrefix,
Expand All @@ -455,6 +470,7 @@ func run(o *Options) error {
enableBridgingMode,
enableAntreaIPAM,
o.config.DisableTXChecksumOffload,
multiclusterEnabled,
networkReadyCh)

if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
Expand Down Expand Up @@ -724,9 +740,12 @@ func run(o *Options) error {
go mcastController.Run(stopCh)
}

if features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.Enable {
if multiclusterEnabled {
mcInformerFactory.Start(stopCh)
go mcRouteController.Run(stopCh)
if isNetworkPolicyOnly {
go mcPolicyOnlyRouteController.Run(stopCh)
}
}

// statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for
Expand Down
6 changes: 3 additions & 3 deletions cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,9 @@ func (o *Options) validateK8sNodeOptions() error {
}
}
if (features.DefaultFeatureGate.Enabled(features.Multicluster) || o.config.Multicluster.Enable) &&
encapMode != config.TrafficEncapModeEncap {
// Only Encap mode is supported for Multi-cluster feature.
return fmt.Errorf("Multicluster is only applicable to the %s mode", config.TrafficEncapModeEncap)
!(encapMode == config.TrafficEncapModeEncap || encapMode == config.TrafficEncapModeNetworkPolicyOnly) {
// Only Encap or networkPolicyOnly is supported for Multi-cluster feature.
return fmt.Errorf("Multicluster is only applicable to the %s mode or %s mode", config.TrafficEncapModeEncap, config.TrafficEncapModeNetworkPolicyOnly)
}
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) {
startPort, endPort, err := parsePortRange(o.config.NodePortLocal.PortRange)
Expand Down
11 changes: 8 additions & 3 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type Initializer struct {
// networkReadyCh should be closed once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
proxyAll bool
enableMulticluster bool
networkReadyCh chan<- struct{}
stopCh <-chan struct{}
nodeType config.NodeType
Expand Down Expand Up @@ -132,6 +133,7 @@ func NewInitializer(
enableProxy bool,
proxyAll bool,
connectUplinkToBridge bool,
enableMulticluster bool,
) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
Expand All @@ -154,6 +156,7 @@ func NewInitializer(
enableProxy: enableProxy,
proxyAll: proxyAll,
connectUplinkToBridge: connectUplinkToBridge,
enableMulticluster: enableMulticluster,
}
}

Expand Down Expand Up @@ -755,10 +758,12 @@ func (i *Initializer) setupDefaultTunnelInterface() error {
// It's not necessary for new Linux kernel versions with the following patch:
// https://github.com/torvalds/linux/commit/89e5c58fc1e2857ccdaae506fb8bc5fed57ee063.
shouldEnableCsum := i.networkConfig.TunnelCsum && (i.networkConfig.TunnelType == ovsconfig.GeneveTunnel || i.networkConfig.TunnelType == ovsconfig.VXLANTunnel)
tunnelInterfaceSupported := i.networkConfig.TrafficEncapMode.SupportsEncap() ||
(i.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() && i.enableMulticluster)

// Check the default tunnel port.
if portExists {
if i.networkConfig.TrafficEncapMode.SupportsEncap() &&
if tunnelInterfaceSupported &&
tunnelIface.TunnelInterfaceConfig.Type == i.networkConfig.TunnelType &&
tunnelIface.TunnelInterfaceConfig.DestinationPort == i.networkConfig.TunnelPort &&
tunnelIface.TunnelInterfaceConfig.LocalIP.Equal(localIP) {
Expand All @@ -775,7 +780,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error {
}

if err := i.ovsBridgeClient.DeletePort(tunnelIface.PortUUID); err != nil {
if i.networkConfig.TrafficEncapMode.SupportsEncap() {
if tunnelInterfaceSupported {
return fmt.Errorf("failed to remove tunnel port %s with wrong tunnel type: %s", tunnelPortName, err)
}
klog.Errorf("Failed to remove tunnel port %s in NoEncapMode: %v", tunnelPortName, err)
Expand All @@ -786,7 +791,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error {
}

// Create the default tunnel port and interface.
if i.networkConfig.TrafficEncapMode.SupportsEncap() {
if tunnelInterfaceSupported {
if tunnelPortName != defaultTunInterfaceName {
// Reset the tunnel interface name to the desired name before
// recreating the tunnel port and interface.
Expand Down
33 changes: 33 additions & 0 deletions pkg/agent/cniserver/interface_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,39 @@ func (ic *ifConfigurator) configureContainerLink(
}
}

func (ic *ifConfigurator) configureContainerMTU(
containerNetNS string,
containerIFDev string,
hostInterfaceName string,
mtu int,
) error {
updateMTU := func(interfaceName string) error {
link, err := netlink.LinkByName(interfaceName)
if err != nil {
return fmt.Errorf("failed to find interface %s: %v", interfaceName, err)
}
err = netlink.LinkSetMTU(link, mtu)
if err != nil {
return fmt.Errorf("failed to set MTU for interface %s: %v", interfaceName, err)
}
return nil
}

if err := ns.WithNetNSPath(containerNetNS, func(hostNS ns.NetNS) error {
if err := updateMTU(containerIFDev); err != nil {
return fmt.Errorf("error when updating container interface MTU: %v", err)
}
return nil
}); err != nil {
return err
}

if err := updateMTU(hostInterfaceName); err != nil {
return fmt.Errorf("error when updating host interface MTU: %v", err)
}
return nil
}

func (ic *ifConfigurator) removeContainerLink(containerID, hostInterfaceName string) error {
klog.V(2).Infof("Deleting veth devices for container %s", containerID)
// Don't return an error if the device is already removed as CniDel can be called multiple times.
Expand Down
9 changes: 9 additions & 0 deletions pkg/agent/cniserver/interface_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ func (ic *ifConfigurator) configureContainerLink(
return nil
}

func (ic *ifConfigurator) configureContainerMTU(
containerNetNS string,
containerIFDev string,
hostInterfaceName string,
mtu int,
) error {
return nil
}

// createContainerLink creates HNSEndpoint using the IP configuration in the IPAM result.
func (ic *ifConfigurator) createContainerLink(endpointName string, result *current.Result, containerID, podName, podNamespace string) (hostLink *hcsshim.HNSEndpoint, err error) {
containerIP, err := findContainerIPConfig(result.IPs)
Expand Down
12 changes: 12 additions & 0 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in
return interfaceConfig
}

func (pc *podConfigurator) configureInterfacesMTU(
containerNetNS string,
containerIFDev string,
result *current.Result,
mtu int) error {
hostIface := result.Interfaces[0]
if err := pc.ifConfigurator.configureContainerMTU(containerNetNS, containerIFDev, hostIface.Name, mtu); err != nil {
return err
}
return nil
}

func (pc *podConfigurator) configureInterfaces(
podName string,
podNameSpace string,
Expand Down
17 changes: 16 additions & 1 deletion pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type CNIServer struct {
enableSecondaryNetworkIPAM bool
disableTXChecksumOffload bool
secondaryNetworkEnabled bool
multiClusterEnabled bool
// networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
networkReadyCh <-chan struct{}
}
Expand Down Expand Up @@ -624,7 +625,7 @@ func New(
nodeConfig *config.NodeConfig,
kubeClient clientset.Interface,
routeClient route.Interface,
isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool,
isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload, multiclusterEnabled bool,
networkReadyCh <-chan struct{},
) *CNIServer {
return &CNIServer{
Expand All @@ -640,6 +641,7 @@ func New(
enableBridgingMode: enableBridgingMode,
disableTXChecksumOffload: disableTXChecksumOffload,
enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM,
multiClusterEnabled: multiclusterEnabled,
networkReadyCh: networkReadyCh,
}
}
Expand Down Expand Up @@ -716,6 +718,19 @@ func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, e
return &cnipb.CniCmdResponse{CniResult: []byte("")}, fmt.Errorf("failed to connect container %s to ovs: %w", cniConfig.ContainerId, err)
}

if s.multiClusterEnabled {
// Antrea multi-cluster supports Geneve tunnel mode only.
mtu := cniConfig.MTU - config.GeneveOverhead
if err := s.podConfigurator.configureInterfacesMTU(
s.hostNetNsPath(cniConfig.Netns),
cniConfig.Ifname,
prevResult,
mtu,
); err != nil {
return &cnipb.CniCmdResponse{CniResult: []byte("")}, fmt.Errorf("failed to configure container %s's MTU: %w", cniConfig.ContainerId, err)
}
}

// we return prevResult, which should be exactly what we received from
// the runtime, potentially converted to the current CNI version used by
// Antrea.
Expand Down
58 changes: 23 additions & 35 deletions pkg/agent/multicluster/mc_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package noderoute
package multicluster

import (
"errors"
"fmt"
"net"
"time"
Expand Down Expand Up @@ -138,16 +139,8 @@ func NewMCRouteController(
func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) {
gw, isGW := obj.(*mcv1alpha1.Gateway)
if !isGW {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.ErrorS(nil, "Received unexpected object", "object", obj)
return
}
gw, ok = deletedState.Obj.(*mcv1alpha1.Gateway)
if !ok {
klog.ErrorS(nil, "DeletedFinalStateUnknown contains non-Gateway object", "object", deletedState.Obj)
return
}
klog.ErrorS(errors.New("received unexpected object"), "enqueueGateway can't process event", "obj", obj)
return
}

if gw.Namespace != c.namespace {
Expand All @@ -166,16 +159,8 @@ func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) {
func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete bool) {
ciImp, isciImp := obj.(*mcv1alpha1.ClusterInfoImport)
if !isciImp {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.ErrorS(nil, "Received unexpected object", "object", obj)
return
}
ciImp, ok = deletedState.Obj.(*mcv1alpha1.ClusterInfoImport)
if !ok {
klog.ErrorS(nil, "DeletedFinalStateUnknown contains non-ClusterInfoImport object", "object", deletedState.Obj)
return
}
klog.ErrorS(errors.New("received unexpected object"), "enqueueClusterInfoImport can't process event", "obj", obj)
return
}

if ciImp.Namespace != c.namespace {
Expand Down Expand Up @@ -391,7 +376,7 @@ func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gatewa
}

func (c *MCRouteController) deleteMCFlowsForSingleCIImp(ciImpName string) error {
if err := c.ofClient.UninstallMulticlusterFlows(ciImpName); err != nil {
if err := c.ofClient.UninstallMulticlusterFlows(fmt.Sprintf("cluster_%s", ciImpName)); err != nil {
return fmt.Errorf("failed to uninstall multi-cluster flows to remote Gateway Node %s: %v", ciImpName, err)
}
delete(c.installedCIImports, ciImpName)
Expand All @@ -405,27 +390,30 @@ func (c *MCRouteController) deleteMCFlowsForAllCIImps() error {
return nil
}

// getActiveGateway compares Gateway's CreationTimestamp to get the active Gateway,
// The last created Gateway will be the active Gateway.
func (c *MCRouteController) getActiveGateway() (*mcv1alpha1.Gateway, error) {
gws, err := c.gwLister.Gateways(c.namespace).List(labels.Everything())
activeGW, err := getActiveGateway(c.gwLister, c.namespace)
if err != nil {
return nil, err
}
if len(gws) == 0 {
if activeGW == nil {
return nil, nil
}
// Comparing Gateway's CreationTimestamp to get the last created Gateway.
lastCreatedGW := gws[0]
for _, gw := range gws {
if lastCreatedGW.CreationTimestamp.Before(&gw.CreationTimestamp) {
lastCreatedGW = gw
}
if net.ParseIP(activeGW.GatewayIP) == nil || net.ParseIP(activeGW.InternalIP) == nil {
return nil, fmt.Errorf("the active Gateway %s has no valid GatewayIP or InternalIP", activeGW.Name)
}
if net.ParseIP(lastCreatedGW.GatewayIP) == nil || net.ParseIP(lastCreatedGW.InternalIP) == nil {
return nil, fmt.Errorf("the last created Gateway %s has no valid GatewayIP or InternalIP", lastCreatedGW.Name)
return activeGW, nil
}

func getActiveGateway(gwLister mclisters.GatewayLister, namespace string) (*mcv1alpha1.Gateway, error) {
gws, err := gwLister.Gateways(namespace).List(labels.Everything())
if err != nil {
return nil, err
}
if len(gws) == 0 {
return nil, nil
}
return lastCreatedGW, nil
// The Gateway webhook guarantees there will be at most one Gateway in a cluster.
return gws[0], nil
}

func generatePeerConfigs(subnets []string, gatewayIP net.IP) (map[*net.IPNet]net.IP, error) {
Expand Down
Loading

0 comments on commit ac9547d

Please sign in to comment.