diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index a2e41a2a441..8119583b2bd 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -346,6 +346,12 @@ multicluster: enableStretchedNetworkPolicy: {{ .enableStretchedNetworkPolicy }} # Enable Pod to Pod connectivity. enablePodToPodConnectivity: {{ .enablePodToPodConnectivity }} +# WireGuard tunnel configuration for cross-cluster traffic. + wireGuard: + # Enable WireGuard tunnel for cross-cluster traffic. + enable: {{ .wireGuard.enable }} + # WireGuard tunnel port for cross-cluster traffic. + port: {{ .wireGuard.port }} {{- end }} {{- if .Values.featureGates.SecondaryNetwork }} diff --git a/build/charts/antrea/values.yaml b/build/charts/antrea/values.yaml index 1c770340dbf..80336ae8d3b 100644 --- a/build/charts/antrea/values.yaml +++ b/build/charts/antrea/values.yaml @@ -338,6 +338,12 @@ multicluster: enableStretchedNetworkPolicy: false # -- Enable Multi-cluster Pod to Pod connectivity. enablePodToPodConnectivity: false + # WireGuard tunnel configuration for cross-cluster traffic. + wireGuard: + # Enable WireGuard tunnel for cross-cluster traffic. + enable: false + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 testing: ## -- enable code coverage measurement (used when testing Antrea only). diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index ee250211102..991ea9f0157 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3242,6 +3242,12 @@ data: enableStretchedNetworkPolicy: false # Enable Pod to Pod connectivity. enablePodToPodConnectivity: false + # WireGuard tunnel configuration for cross-cluster traffic. + wireGuard: + # Enable WireGuard tunnel for cross-cluster traffic. + enable: false + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -4299,11 +4305,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments -<<<<<<< HEAD - checksum/config: be4d7318350c398a0362a44ff0d4ff779150a303e577ed1e2265aaa75c00546e -======= - checksum/config: 5e86a889fca88734845bed60765a31dd090ba17830f29aaecc0b162e83e725ba ->>>>>>> 259c89b1 (Add toggle for Multi-cluster Pod-to-Pod connectivity) + checksum/config: 17f452e2145fdf9a5b49b1f146dd99d2aac64fe609a33dab72f5c9a66dd5ee13 labels: app: antrea component: antrea-agent @@ -4544,11 +4546,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments -<<<<<<< HEAD - checksum/config: be4d7318350c398a0362a44ff0d4ff779150a303e577ed1e2265aaa75c00546e -======= - checksum/config: 5e86a889fca88734845bed60765a31dd090ba17830f29aaecc0b162e83e725ba ->>>>>>> 259c89b1 (Add toggle for Multi-cluster Pod-to-Pod connectivity) + checksum/config: 17f452e2145fdf9a5b49b1f146dd99d2aac64fe609a33dab72f5c9a66dd5ee13 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 0c7771eb11d..36351d063e4 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3242,6 +3242,12 @@ data: enableStretchedNetworkPolicy: false # Enable Pod to Pod connectivity. enablePodToPodConnectivity: false + # WireGuard tunnel configuration for cross-cluster traffic. + wireGuard: + # Enable WireGuard tunnel for cross-cluster traffic. + enable: false + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -4299,7 +4305,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: be4d7318350c398a0362a44ff0d4ff779150a303e577ed1e2265aaa75c00546e + checksum/config: 17f452e2145fdf9a5b49b1f146dd99d2aac64fe609a33dab72f5c9a66dd5ee13 labels: app: antrea component: antrea-agent @@ -4541,7 +4547,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: be4d7318350c398a0362a44ff0d4ff779150a303e577ed1e2265aaa75c00546e + checksum/config: 17f452e2145fdf9a5b49b1f146dd99d2aac64fe609a33dab72f5c9a66dd5ee13 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 8b5fd235680..66541af3111 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3242,6 +3242,12 @@ data: enableStretchedNetworkPolicy: false # Enable Pod to Pod connectivity. enablePodToPodConnectivity: false + # WireGuard tunnel configuration for cross-cluster traffic. + wireGuard: + # Enable WireGuard tunnel for cross-cluster traffic. + enable: false + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -4299,11 +4305,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments -<<<<<<< HEAD - checksum/config: fca1f2d4967020380202ef0c2394b560055830ee2770e41f791af76b42559659 -======= - checksum/config: 498f6060a4d4397c8ce36007eebbe29ac4650f30b393a45bdef064db89eff868 ->>>>>>> 259c89b1 (Add toggle for Multi-cluster Pod-to-Pod connectivity) + checksum/config: 7aac7ba322070f7138eb5c847bd42b2a2de73a1f86816fa19d0d4db7d67975eb labels: app: antrea component: antrea-agent @@ -4542,11 +4544,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments -<<<<<<< HEAD - checksum/config: fca1f2d4967020380202ef0c2394b560055830ee2770e41f791af76b42559659 -======= - checksum/config: 498f6060a4d4397c8ce36007eebbe29ac4650f30b393a45bdef064db89eff868 ->>>>>>> 259c89b1 (Add toggle for Multi-cluster Pod-to-Pod connectivity) + checksum/config: 7aac7ba322070f7138eb5c847bd42b2a2de73a1f86816fa19d0d4db7d67975eb labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 9ecd1924b54..21efd3c1cd7 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3255,6 +3255,12 @@ data: enableStretchedNetworkPolicy: false # Enable Pod to Pod connectivity. enablePodToPodConnectivity: false + # WireGuard tunnel configuration for cross-cluster traffic. + wireGuard: + # Enable WireGuard tunnel for cross-cluster traffic. + enable: false + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -4312,7 +4318,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ab53bf1e28a67ba5be2b99989a8d28b31d716d79b207a610cd5258ead514eb6b + checksum/config: cb5115be05df90e1e576dbea0a75d3802ef5aebe643daf2bba524fe0df2b5bb4 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -4597,7 +4603,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ab53bf1e28a67ba5be2b99989a8d28b31d716d79b207a610cd5258ead514eb6b + checksum/config: cb5115be05df90e1e576dbea0a75d3802ef5aebe643daf2bba524fe0df2b5bb4 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index a188b8536e5..4e7944d542f 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3242,6 +3242,12 @@ data: enableStretchedNetworkPolicy: false # Enable Pod to Pod connectivity. enablePodToPodConnectivity: false + # WireGuard tunnel configuration for cross-cluster traffic. + wireGuard: + # Enable WireGuard tunnel for cross-cluster traffic. + enable: false + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -4299,7 +4305,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 2c1c5158b6a3ea32eff58bc1e498592e80ebecee07f51b10c722b67afce7b964 + checksum/config: 8258fefd7715d5e2cb6d1b5fe0b31994d85c890e56a727a5108ba4a99765a9c8 labels: app: antrea component: antrea-agent @@ -4538,7 +4544,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 2c1c5158b6a3ea32eff58bc1e498592e80ebecee07f51b10c722b67afce7b964 + checksum/config: 8258fefd7715d5e2cb6d1b5fe0b31994d85c890e56a727a5108ba4a99765a9c8 labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 2d829f74c6a..0094e3c7d83 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -343,6 +343,8 @@ func run(o *Options) error { mcNamespace, o.config.Multicluster.EnableStretchedNetworkPolicy, o.config.Multicluster.EnablePodToPodConnectivity, + o.config.Multicluster.WireGuard, + routeClient, ) } if enableMulticlusterNP { diff --git a/multicluster/apis/multicluster/v1alpha1/gateway_types.go b/multicluster/apis/multicluster/v1alpha1/gateway_types.go index c1c0ec10d34..e1e6e178bdf 100644 --- a/multicluster/apis/multicluster/v1alpha1/gateway_types.go +++ b/multicluster/apis/multicluster/v1alpha1/gateway_types.go @@ -52,6 +52,8 @@ type ClusterInfo struct { GatewayInfos []GatewayInfo `json:"gatewayInfos,omitempty"` // PodCIDRs is the Pod IP address CIDRs. PodCIDRs []string `json:"podCIDRs,omitempty"` + // WireGuard has information of WireGuard tunnel. + WireGuard WireGuardConfig `json:"wireGuard,omitempty"` } //+kubebuilder:object:root=true diff --git a/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go b/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go index af0ed8c8c3c..5c85452002c 100644 --- a/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go +++ b/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go @@ -31,6 +31,11 @@ const ( PrecedenceExternal = "external" ) +type WireGuardConfig struct { + PublicKey string `json:"publicKey"` + Port int `json:"port"` +} + //+kubebuilder:object:root=true // +kubebuilder:printcolumn:name="Gateway IP Precedence",type=string,JSONPath=`.gatewayIPPrecedence`,description="Precedence of Gateway IP types" diff --git a/multicluster/build/yamls/antrea-multicluster-leader-global.yml b/multicluster/build/yamls/antrea-multicluster-leader-global.yml index b361d645ec4..2f425b53c3b 100644 --- a/multicluster/build/yamls/antrea-multicluster-leader-global.yml +++ b/multicluster/build/yamls/antrea-multicluster-leader-global.yml @@ -387,6 +387,17 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuard has information of WireGuard tunnel. + properties: + port: + type: integer + publicKey: + type: string + required: + - port + - publicKey + type: object type: object clusterNetworkPolicy: description: If exported resource is AntreaClusterNetworkPolicy. @@ -3079,6 +3090,17 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuard has information of WireGuard tunnel. + properties: + port: + type: integer + publicKey: + type: string + required: + - port + - publicKey + type: object type: object clusternetworkpolicy: description: If imported resource is AntreaClusterNetworkPolicy. diff --git a/multicluster/build/yamls/antrea-multicluster-member.yml b/multicluster/build/yamls/antrea-multicluster-member.yml index 715205ea511..d225e762953 100644 --- a/multicluster/build/yamls/antrea-multicluster-member.yml +++ b/multicluster/build/yamls/antrea-multicluster-member.yml @@ -119,6 +119,17 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuard has information of WireGuard tunnel. + properties: + port: + type: integer + publicKey: + type: string + required: + - port + - publicKey + type: object type: object status: description: ClusterInfoImportStatus defines the observed state of ClusterInfoImport. diff --git a/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterinfoimports.yaml b/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterinfoimports.yaml index 5bb209cbfc9..58f1a30bf95 100644 --- a/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterinfoimports.yaml +++ b/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterinfoimports.yaml @@ -65,6 +65,17 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuard has information of WireGuard tunnel. + properties: + port: + type: integer + publicKey: + type: string + required: + - port + - publicKey + type: object type: object status: description: ClusterInfoImportStatus defines the observed state of ClusterInfoImport. diff --git a/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceexports.yaml b/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceexports.yaml index fc5d680b9a5..e90f78fac1f 100644 --- a/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceexports.yaml +++ b/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceexports.yaml @@ -82,6 +82,17 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuard has information of WireGuard tunnel. + properties: + port: + type: integer + publicKey: + type: string + required: + - port + - publicKey + type: object type: object clusterNetworkPolicy: description: If exported resource is AntreaClusterNetworkPolicy. diff --git a/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceimports.yaml b/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceimports.yaml index 1e0dc7a17c2..d1eafa08a5c 100644 --- a/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceimports.yaml +++ b/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceimports.yaml @@ -80,6 +80,17 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuard has information of WireGuard tunnel. + properties: + port: + type: integer + publicKey: + type: string + required: + - port + - publicKey + type: object type: object clusternetworkpolicy: description: If imported resource is AntreaClusterNetworkPolicy. diff --git a/multicluster/controllers/multicluster/member/gateway_controller.go b/multicluster/controllers/multicluster/member/gateway_controller.go index 2dd50715983..20487d80394 100644 --- a/multicluster/controllers/multicluster/member/gateway_controller.go +++ b/multicluster/controllers/multicluster/member/gateway_controller.go @@ -19,6 +19,7 @@ package member import ( "context" "fmt" + "strconv" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +34,7 @@ import ( mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" "antrea.io/antrea/multicluster/controllers/multicluster/common" "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" + antreaTypes "antrea.io/antrea/pkg/agent/types" ) type ( @@ -102,21 +104,21 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct }, } - createOrUpdate := func(gwIP string) error { + createOrUpdate := func(gw *mcsv1alpha1.Gateway) error { existingResExport := &mcsv1alpha1.ResourceExport{} err := commonArea.Get(ctx, resExportNamespacedName, existingResExport) if err != nil && !apierrors.IsNotFound(err) { return err } if apierrors.IsNotFound(err) || !existingResExport.DeletionTimestamp.IsZero() { - if err = r.createResourceExport(ctx, req, commonArea, gwIP); err != nil { + if err = r.createResourceExport(ctx, req, commonArea, gw); err != nil { return err } return nil } // updateResourceExport will update latest Gateway information with the existing ResourceExport's resourceVersion. // It will return an error and retry when there is a version conflict. - if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, &mcsv1alpha1.GatewayInfo{GatewayIP: gwIP}); err != nil { + if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, gw); err != nil { return err } return nil @@ -133,14 +135,14 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, nil } - if err := createOrUpdate(gw.GatewayIP); err != nil { + if err := createOrUpdate(gw); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.Request, - commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gwInfo *mcsv1alpha1.GatewayInfo) error { + commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gateway *mcsv1alpha1.Gateway) error { resExportSpec := mcsv1alpha1.ResourceExportSpec{ Kind: constants.ClusterInfoKind, ClusterID: r.localClusterID, @@ -148,11 +150,24 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R Namespace: r.namespace, } resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{ - ClusterID: r.localClusterID, - ServiceCIDR: r.serviceCIDR, - PodCIDRs: r.podCIDRs, - GatewayInfos: []mcsv1alpha1.GatewayInfo{*gwInfo}, + ClusterID: r.localClusterID, + ServiceCIDR: r.serviceCIDR, + PodCIDRs: r.podCIDRs, + GatewayInfos: []mcsv1alpha1.GatewayInfo{ + { + GatewayIP: gateway.GatewayIP, + }, + }, + WireGuard: mcsv1alpha1.WireGuardConfig{ + PublicKey: gateway.Annotations[antreaTypes.MulticlusterWireGuardPublicAnnotationKey], + }, } + + port, err := strconv.Atoi(gateway.Annotations[antreaTypes.MulticlusterWireGuardAnnotationPort]) + if err != nil { + return fmt.Errorf("invalid WireGuard Port format") + } + resExportSpec.ClusterInfo.WireGuard.Port = port klog.V(2).InfoS("Updating ClusterInfo kind of ResourceExport", "clusterinfo", klog.KObj(existingResExport), "gateway", req.NamespacedName) existingResExport.Spec = resExportSpec @@ -163,7 +178,7 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R } func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.Request, - commonArea commonarea.RemoteCommonArea, gatewayIP string) error { + commonArea commonarea.RemoteCommonArea, gateway *mcsv1alpha1.Gateway) error { resExportSpec := mcsv1alpha1.ResourceExportSpec{ Kind: constants.ClusterInfoKind, ClusterID: r.localClusterID, @@ -176,9 +191,19 @@ func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.R PodCIDRs: r.podCIDRs, GatewayInfos: []mcsv1alpha1.GatewayInfo{ { - GatewayIP: gatewayIP, + GatewayIP: gateway.GatewayIP, }, }, + WireGuard: mcsv1alpha1.WireGuardConfig{ + PublicKey: gateway.Annotations[antreaTypes.MulticlusterWireGuardPublicAnnotationKey], + }, + } + if gateway.Annotations[antreaTypes.MulticlusterWireGuardAnnotationPort] != "" { + port, err := strconv.Atoi(gateway.Annotations[antreaTypes.MulticlusterWireGuardAnnotationPort]) + if err != nil { + return fmt.Errorf("invalid WireGuard Port format") + } + resExportSpec.ClusterInfo.WireGuard.Port = port } resExport := &mcsv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index a6c051bd3c5..0b9a02e2bba 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -1060,13 +1060,13 @@ func (i *Initializer) waitForIPsecMonitorDaemon() error { // initializeWireguard checks if preconditions are met for using WireGuard and initializes WireGuard client or cleans up. func (i *Initializer) initializeWireGuard() error { i.wireGuardConfig.MTU = i.nodeConfig.NodeTransportInterfaceMTU - config.WireGuardOverhead - wgClient, err := wireguard.New(i.client, i.nodeConfig, i.wireGuardConfig) + wgClient, err := wireguard.New(i.client, nil, i.nodeConfig, i.wireGuardConfig, "") if err != nil { return err } i.wireGuardClient = wgClient - return i.wireGuardClient.Init() + return i.wireGuardClient.Init(false) } // readIPSecPSK reads the IPsec PSK value from environment variable ANTREA_IPSEC_PSK diff --git a/pkg/agent/config/node_config.go b/pkg/agent/config/node_config.go index 044ac60d706..2018e212e7c 100644 --- a/pkg/agent/config/node_config.go +++ b/pkg/agent/config/node_config.go @@ -117,6 +117,10 @@ type AdapterNetConfig struct { } type WireGuardConfig struct { + // Enable shows that WireGuard is enabled or not. + Enable bool + // Initialized shows the WireGuard interface is Initialized or not. + Initialized bool // Name is the name of WireGurad interface. e.g. antrea-wg0. Name string // LinkIndex is the link index of WireGuard interface. diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index e05779a72cd..aa8dd65c5e1 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -596,7 +596,7 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error { if peerNodeIP == nil { peerNodeIP = peerNodeIPs.IPv6 } - if err := c.wireGuardClient.UpdatePeer(nodeName, peerWireGuardPublicKey, peerNodeIP, peerPodCIDRs); err != nil { + if err := c.wireGuardClient.UpdatePeer(nodeName, peerWireGuardPublicKey, peerNodeIP, 0, peerPodCIDRs); err != nil { return err } } diff --git a/pkg/agent/multicluster/mc_route_controller.go b/pkg/agent/multicluster/mc_route_controller.go index 0cc39a9e740..3f474befadb 100644 --- a/pkg/agent/multicluster/mc_route_controller.go +++ b/pkg/agent/multicluster/mc_route_controller.go @@ -15,10 +15,15 @@ package multicluster import ( + "context" "fmt" + "net" "time" + "github.com/vishvananda/netlink" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -33,6 +38,9 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" + antreaRoute "antrea.io/antrea/pkg/agent/route" + "antrea.io/antrea/pkg/agent/wireguard" + "antrea.io/antrea/pkg/config/agent" "antrea.io/antrea/pkg/ovs/ovsconfig" ) @@ -47,7 +55,13 @@ const ( // Default number of workers processing a resource change defaultWorkers = 1 - workerItemKey = "key" + + multiclusterWireGuardTable = 100 + multiclusterWireGuardInterface = "antrea-mc-wg0" +) + +var ( + wireGuardNewFunc = wireguard.New ) // MCRouteController watches Gateway and ClusterInfoImport events. @@ -57,15 +71,19 @@ type MCRouteController struct { mcClient mcclientset.Interface ovsBridgeClient ovsconfig.OVSBridgeClient ofClient openflow.Client + routeClient antreaRoute.Interface + wireGuardClient wireguard.Interface interfaceStore interfacestore.InterfaceStore nodeConfig *config.NodeConfig + wireGuardConfig *config.WireGuardConfig gwInformer mcinformers.GatewayInformer gwLister mclisters.GatewayLister gwListerSynced cache.InformerSynced ciImportInformer mcinformers.ClusterInfoImportInformer ciImportLister mclisters.ClusterInfoImportLister ciImportListerSynced cache.InformerSynced - queue workqueue.RateLimitingInterface + gatewayQueue workqueue.RateLimitingInterface + ciImpQueue workqueue.RateLimitingInterface // installedCIImports is for saving ClusterInfos which have been processed // in MCRouteController. Need to use mutex to protect 'installedCIImports' if // we change the number of 'defaultWorkers'. @@ -91,20 +109,29 @@ func NewMCRouteController( namespace string, enableStretchedNetworkPolicy bool, enablePodToPodConnectivity bool, + wireGuardConfig agent.MulticlusterWireGuardConfig, + routeClient antreaRoute.Interface, ) *MCRouteController { controller := &MCRouteController{ - mcClient: mcClient, - ovsBridgeClient: ovsBridgeClient, - ofClient: client, - interfaceStore: interfaceStore, - nodeConfig: nodeConfig, + mcClient: mcClient, + ovsBridgeClient: ovsBridgeClient, + ofClient: client, + routeClient: routeClient, + interfaceStore: interfaceStore, + nodeConfig: nodeConfig, + wireGuardConfig: &config.WireGuardConfig{ + Enable: wireGuardConfig.Enable, + Port: wireGuardConfig.Port, + Name: multiclusterWireGuardInterface, + }, gwInformer: gwInformer, gwLister: gwInformer.Lister(), gwListerSynced: gwInformer.Informer().HasSynced, ciImportInformer: ciImportInformer, ciImportLister: ciImportInformer.Lister(), ciImportListerSynced: ciImportInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "gatewayroute"), + gatewayQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "Gateway"), + ciImpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "ClusterInfoImport"), installedCIImports: make(map[string]*mcv1alpha1.ClusterInfoImport), namespace: namespace, enableStretchedNetworkPolicy: enableStretchedNetworkPolicy, @@ -166,7 +193,7 @@ func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) { return } } - c.queue.Add(workerItemKey) + c.gatewayQueue.Add(gw.Name) } func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete bool) { @@ -199,13 +226,14 @@ func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete b } } - c.queue.Add(workerItemKey) + c.ciImpQueue.Add(ciImp.Name) } // Run will create defaultWorkers workers (go routines) which will process // the Gateway events from the workqueue. func (c *MCRouteController) Run(stopCh <-chan struct{}) { - defer c.queue.ShutDown() + defer c.gatewayQueue.ShutDown() + defer c.ciImpQueue.ShutDown() cacheSyncs := []cache.InformerSynced{c.gwListerSynced, c.ciImportListerSynced} klog.InfoS("Starting controller", "controller", controllerName) defer klog.InfoS("Shutting down controller", "controller", controllerName) @@ -214,39 +242,207 @@ func (c *MCRouteController) Run(stopCh <-chan struct{}) { } for i := 0; i < defaultWorkers; i++ { - go wait.Until(c.worker, time.Second, stopCh) + go wait.Until(c.gatewayWorker, time.Second, stopCh) + go wait.Until(c.ciImpWorker, time.Second, stopCh) } <-stopCh } -// worker is a long-running function that will continually call the processNextWorkItem -// function in order to read and process a message on the workqueue. -func (c *MCRouteController) worker() { - for c.processNextWorkItem() { +// gatewayWorker is a long-running function that will continually call the processNextGatewayItem +// function in order to read and process a message on the gatewayQueue. +func (c *MCRouteController) gatewayWorker() { + for c.processNextGatewayItem() { + } +} + +// ciImpWorker is a long-running function that will continually call the processNextClusterInfoImportItem +// function in order to read and process a message on the gatewayQueue. +func (c *MCRouteController) ciImpWorker() { + for c.processNextClusterInfoImportItem() { } } -func (c *MCRouteController) processNextWorkItem() bool { - obj, quit := c.queue.Get() +func (c *MCRouteController) processNextClusterInfoImportItem() bool { + obj, quit := c.ciImpQueue.Get() if quit { return false } - defer c.queue.Done(obj) + defer c.ciImpQueue.Done(obj) - if k, ok := obj.(string); !ok { - c.queue.Forget(obj) + ciImp, ok := obj.(string) + if !ok { + c.ciImpQueue.Forget(obj) klog.InfoS("Expected string in work queue but got", "object", obj) return true - } else if err := c.syncMCFlows(); err == nil { - c.queue.Forget(k) + } + if c.wireGuardConfig.Enable { + if err := c.updatePeers(ciImp); err != nil { + c.ciImpQueue.AddRateLimited(ciImp) + klog.ErrorS(err, "Error updating WireGuard peers, requeuing", "key", ciImp) + } + } + + if err := c.syncMCFlows(); err == nil { + c.ciImpQueue.Forget(obj) } else { // Put the item back on the workqueue to handle any transient errors. - c.queue.AddRateLimited(k) - klog.ErrorS(err, "Error syncing key, requeuing", "key", k) + c.ciImpQueue.AddRateLimited(ciImp) + klog.ErrorS(err, "Error syncing key, requeuing", "key", ciImp) } return true } +func (c *MCRouteController) processNextGatewayItem() bool { + obj, quit := c.gatewayQueue.Get() + if quit { + return false + } + defer c.gatewayQueue.Done(obj) + + gateway, ok := obj.(string) + if !ok { + c.gatewayQueue.Forget(obj) + klog.InfoS("Expected string in work queue but got", "object", obj) + return true + } + + if c.wireGuardConfig.Enable && gateway == c.nodeConfig.Name { + if err := c.reconcileWireGuard(gateway); err != nil { + klog.ErrorS(err, "Error when reconcile WireGuard, requeuing", "key", gateway) + c.gatewayQueue.AddRateLimited(gateway) + } + } + if err := c.syncMCFlows(); err == nil { + c.gatewayQueue.Forget(obj) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.gatewayQueue.AddRateLimited(gateway) + klog.ErrorS(err, "Error syncing key, requeuing", "key", gateway) + } + return true +} + +func (c *MCRouteController) reconcileWireGuard(gateway string) error { + klog.InfoS("Reconcile WireGuard") + if !c.wireGuardConfig.Initialized { + klog.InfoS("Initializing WireGuard interface and rule") + if err := c.initializeWireGuardInterface(); err != nil { + return err + } + if err := c.initHostRoute(); err != nil { + return err + } + c.wireGuardConfig.Initialized = true + return nil + } + + if _, err := c.mcClient.MulticlusterV1alpha1().Gateways(c.namespace).Get(context.TODO(), gateway, metav1.GetOptions{}); err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to get Gateway %s, error: %s", gateway, err.Error()) + } + klog.InfoS("Gateway is deleted, removing WireGuard interface and rule") + if err := c.deleteHostRoute(); err != nil { + return err + } + c.wireGuardConfig.Initialized = false + } + + return nil +} + +func (c *MCRouteController) deleteHostRoute() error { + rule := netlink.NewRule() + rule.Table = multiclusterWireGuardTable + rule.Mark = 0x4d2 + + if err := c.routeClient.DeleteMulticlusterWireGuardRule(rule); err != nil { + return err + } + fmt.Println("delete host route") + link, err := c.routeClient.LinkByName(multiclusterWireGuardInterface) + if err != nil { + return nil + } + if err := c.routeClient.DeleteLink(link); err != nil { + return err + } + + return nil +} + +func (c *MCRouteController) initHostRoute() error { + rule := netlink.NewRule() + rule.Table = multiclusterWireGuardTable + rule.Mark = 0x4d2 + if err := c.routeClient.AddMulticlusterWireGuardRule(rule); err != nil { + return err + } + link, err := c.routeClient.LinkByName(multiclusterWireGuardInterface) + if err != nil { + return err + } + route := &netlink.Route{ + LinkIndex: link.Attrs().Index, + Table: multiclusterWireGuardTable, + Gw: net.IPv4zero, + Scope: netlink.SCOPE_LINK, + } + if err := c.routeClient.AddMulticlusterRoute(route); err != nil { + return err + } + return nil +} + +func (c *MCRouteController) updatePeers(ciImp string) error { + ciImport, err := c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(c.namespace).Get(context.TODO(), ciImp, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return err + } + klog.InfoS("The ClusterInfoImport has been deleted, removing peer", "ClusterInfoImport", ciImp) + return c.wireGuardClient.DeletePeer(ciImp) + } + if !c.wireGuardConfig.Initialized { + return fmt.Errorf("failed to update WireGuard peers, the WireGuard has not been initialized") + } + gatewayIP := net.ParseIP(ciImport.Spec.GatewayInfos[0].GatewayIP) + _, gatewayIPCIDR, err := net.ParseCIDR(ciImport.Spec.GatewayInfos[0].GatewayIP + "/32") + if err != nil { + return err + } + _, serviceIPCIDR, err := net.ParseCIDR(ciImport.Spec.ServiceCIDR) + if err != nil { + return err + } + podCIDRs := []*net.IPNet{} + for _, cidr := range ciImport.Spec.PodCIDRs { + _, podCIDR, err := net.ParseCIDR(cidr) + if err != nil { + return err + } + podCIDRs = append(podCIDRs, podCIDR) + } + + allowedIPs := []*net.IPNet{serviceIPCIDR, gatewayIPCIDR} + allowedIPs = append(allowedIPs, podCIDRs...) + if err := c.wireGuardClient.UpdatePeer(ciImport.Name, ciImport.Spec.WireGuard.PublicKey, gatewayIP, ciImport.Spec.WireGuard.Port, allowedIPs); err != nil { + return err + } + klog.InfoS("Update WireGuard peer with ClusterInfoImport", "ClusterInfoImport", ciImp) + return nil +} + +func (c *MCRouteController) initializeWireGuardInterface() error { + c.wireGuardConfig.MTU = c.nodeConfig.NodeMTU - config.WireGuardOverhead - config.GeneveOverhead + wgClient, err := wireGuardNewFunc(nil, c.mcClient, c.nodeConfig, c.wireGuardConfig, c.namespace) + if err != nil { + return err + } + c.wireGuardClient = wgClient + + return wgClient.Init(true) +} + func (c *MCRouteController) syncMCFlows() error { startTime := time.Now() defer func() { diff --git a/pkg/agent/multicluster/mc_route_controller_test.go b/pkg/agent/multicluster/mc_route_controller_test.go index c6ff4fab5ad..c51f00082fe 100644 --- a/pkg/agent/multicluster/mc_route_controller_test.go +++ b/pkg/agent/multicluster/mc_route_controller_test.go @@ -21,14 +21,23 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/vishvananda/netlink" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + "antrea.io/antrea/multicluster/pkg/client/clientset/versioned" mcfake "antrea.io/antrea/multicluster/pkg/client/clientset/versioned/fake" mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" oftest "antrea.io/antrea/pkg/agent/openflow/testing" + antreaRoute "antrea.io/antrea/pkg/agent/route" + routemock "antrea.io/antrea/pkg/agent/route/testing" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/agent/wireguard" + wireguardmock "antrea.io/antrea/pkg/agent/wireguard/testing" + "antrea.io/antrea/pkg/config/agent" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" ) @@ -41,7 +50,11 @@ type fakeRouteController struct { interfaceStore interfacestore.InterfaceStore } -func newMCRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRouteController, func()) { +func newMCRouteController(t *testing.T, + nodeConfig *config.NodeConfig, + wireGuardConfig agent.MulticlusterWireGuardConfig, + routeClient antreaRoute.Interface, +) (*fakeRouteController, func()) { mcClient := mcfake.NewSimpleClientset() mcInformerFactory := mcinformers.NewSharedInformerFactory(mcClient, 60*time.Second) gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways() @@ -62,6 +75,8 @@ func newMCRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRou "default", true, true, + wireGuardConfig, + routeClient, ) return &fakeRouteController{ MCRouteController: c, @@ -76,6 +91,7 @@ func newMCRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRou var ( gw1CreationTime = metav1.NewTime(time.Now()) gw2CreationTime = metav1.NewTime(time.Now().Add(10 * time.Minute)) + gw3CreationTime = metav1.NewTime(time.Now()) gateway1 = mcv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: "node-1", @@ -94,8 +110,22 @@ var ( GatewayIP: "172.17.0.12", InternalIP: "192.17.0.12", } + gateway3 = mcv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-3", + Namespace: "default", + Annotations: map[string]string{ + types.MulticlusterWireGuardPublicAnnotationKey: "key", + types.MulticlusterWireGuardAnnotationPort: "port", + }, + CreationTimestamp: gw3CreationTime, + }, + GatewayIP: "172.17.0.13", + InternalIP: "192.17.0.13", + } gw1GatewayIP = net.ParseIP(gateway1.GatewayIP) gw2InternalIP = net.ParseIP(gateway2.InternalIP) + gw3GatewayIP = net.ParseIP(gateway3.GatewayIP) clusterInfoImport1 = mcv1alpha1.ClusterInfoImport{ ObjectMeta: metav1.ObjectMeta{ @@ -128,12 +158,126 @@ var ( }, }, } + + clusterInfoImport3 = mcv1alpha1.ClusterInfoImport{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-d-default-clusterinfo", + Namespace: "default", + }, + Spec: mcv1alpha1.ClusterInfo{ + ClusterID: "cluster-d", + ServiceCIDR: "14.14.4.0/12", + GatewayInfos: []mcv1alpha1.GatewayInfo{ + { + GatewayIP: "12.13.0.10", + }, + }, + PodCIDRs: []string{ + "10.10.0.0/16", + }, + WireGuard: mcv1alpha1.WireGuardConfig{ + PublicKey: "key", + Port: 51821, + }, + }, + } + + rule = &netlink.Rule{ + SuppressIfgroup: -1, + SuppressPrefixlen: -1, + Priority: -1, + Mark: 0x4d2, + Mask: -1, + Goto: -1, + Flow: -1, + Table: 100, + } + + route = &netlink.Route{ + LinkIndex: 0, + Table: multiclusterWireGuardTable, + Gw: net.IPv4zero, + Scope: netlink.SCOPE_LINK, + } + + link = &netlink.Device{LinkAttrs: netlink.LinkAttrs{ + Index: 0, + }} ) +func TestMCRouteControllerAsWireGuardGateway(t *testing.T) { + ctrl := gomock.NewController(t) + mockInterface := routemock.NewMockInterface(ctrl) + c, closeFn := newMCRouteController(t, &config.NodeConfig{Name: "node-3"}, agent.MulticlusterWireGuardConfig{Enable: true}, mockInterface) + defer closeFn() + defer c.gatewayQueue.ShutDown() + defer c.ciImpQueue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + wireGuardNewFunc = func(clientSet kubernetes.Interface, + mcClientSet versioned.Interface, + nodeConfig *config.NodeConfig, + wireGuardConfig *config.WireGuardConfig, + mcNamespace string, + ) (wireguard.Interface, error) { + return &wireguardmock.MockWireGuardClient{}, nil + } + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + + // Create Gateway3 + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.GetNamespace()).Create(context.TODO(), + &gateway3, metav1.CreateOptions{}) + mockInterface.EXPECT().AddMulticlusterWireGuardRule(rule).Times(1) + mockInterface.EXPECT().LinkByName(multiclusterWireGuardInterface).Times(1).Return(link, nil) + mockInterface.EXPECT().AddMulticlusterRoute(route).Times(1) + c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).MaxTimes(100) + c.processNextGatewayItem() + + // Create ClusterInfoImport3 + c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport3.GetNamespace()). + Create(context.TODO(), &clusterInfoImport3, metav1.CreateOptions{}) + peerNodeIP3 := getPeerGatewayIP(clusterInfoImport3.Spec) + c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport3.Name, + gomock.Any(), peerNodeIP3, gw3GatewayIP, true).Times(1) + c.processNextClusterInfoImportItem() + + // Update Gateway3's GatewayIP + updatedGateway3a := gateway3.DeepCopy() + updatedGateway3a.GatewayIP = "10.16.0.100" + updatedGateway3aIP := net.ParseIP("10.16.0.100") + c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway3a.GetNamespace()).Update(context.TODO(), + updatedGateway3a, metav1.UpdateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport3.Name, + gomock.Any(), peerNodeIP3, updatedGateway3aIP, true).Times(1) + c.processNextGatewayItem() + + // Delete Gateway3 + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.GetNamespace()).Delete(context.TODO(), + gateway3.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport3.Name).Times(1) + mockInterface.EXPECT().DeleteMulticlusterWireGuardRule(rule).Times(1) + mockInterface.EXPECT().LinkByName(multiclusterWireGuardInterface).Times(1).Return(link, nil) + mockInterface.EXPECT().DeleteLink(link).Times(1) + c.processNextGatewayItem() + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} + func TestMCRouteControllerAsGateway(t *testing.T) { - c, closeFn := newMCRouteController(t, &config.NodeConfig{Name: "node-1"}) + c, closeFn := newMCRouteController(t, &config.NodeConfig{Name: "node-1"}, agent.MulticlusterWireGuardConfig{Enable: false}, nil) defer closeFn() - defer c.queue.ShutDown() + defer c.gatewayQueue.ShutDown() + defer c.ciImpQueue.ShutDown() stopCh := make(chan struct{}) defer close(stopCh) @@ -148,7 +292,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Create(context.TODO(), &gateway1, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).Times(1) - c.processNextWorkItem() + c.processNextGatewayItem() // Create two ClusterInfoImports c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport1.GetNamespace()). @@ -156,14 +300,14 @@ func TestMCRouteControllerAsGateway(t *testing.T) { peerNodeIP1 := getPeerGatewayIP(clusterInfoImport1.Spec) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP1, gw1GatewayIP, true).Times(1) - c.processNextWorkItem() + c.processNextClusterInfoImportItem() c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()). Create(context.TODO(), &clusterInfoImport2, metav1.CreateOptions{}) peerNodeIP2 := getPeerGatewayIP(clusterInfoImport2.Spec) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport2.Name, gomock.Any(), peerNodeIP2, gw1GatewayIP, true).Times(1) - c.processNextWorkItem() + c.processNextClusterInfoImportItem() // Update a ClusterInfoImport clusterInfoImport1.Spec.ServiceCIDR = "192.10.1.0/24" @@ -171,13 +315,13 @@ func TestMCRouteControllerAsGateway(t *testing.T) { Update(context.TODO(), &clusterInfoImport1, metav1.UpdateOptions{}) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP1, gw1GatewayIP, true).Times(1) - c.processNextWorkItem() + c.processNextClusterInfoImportItem() // Delete a ClusterInfoImport c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()).Delete(context.TODO(), clusterInfoImport2.Name, metav1.DeleteOptions{}) c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1) - c.processNextWorkItem() + c.processNextClusterInfoImportItem() // Update Gateway1's GatewayIP updatedGateway1a := gateway1.DeepCopy() @@ -187,14 +331,14 @@ func TestMCRouteControllerAsGateway(t *testing.T) { updatedGateway1a, metav1.UpdateOptions{}) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP1, updatedGateway1aIP, true).Times(1) - c.processNextWorkItem() + c.processNextGatewayItem() // Update Gateway1's InternalIP updatedGateway1b := updatedGateway1a.DeepCopy() updatedGateway1b.InternalIP = "17.162.0.10" c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1b.GetNamespace()).Update(context.TODO(), updatedGateway1b, metav1.UpdateOptions{}) - c.processNextWorkItem() + c.processNextGatewayItem() // Create Gateway2 as active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), @@ -202,7 +346,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), gw2InternalIP, true).Times(1) - c.processNextWorkItem() + c.processNextGatewayItem() // Delete Gateway2, then Gateway1 become active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Delete(context.TODO(), @@ -211,13 +355,13 @@ func TestMCRouteControllerAsGateway(t *testing.T) { c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).Times(1) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP1, updatedGateway1aIP, true).Times(1) - c.processNextWorkItem() + c.processNextGatewayItem() // Delete last Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), gateway1.Name, metav1.DeleteOptions{}) c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.processNextWorkItem() + c.processNextGatewayItem() }() select { case <-time.After(5 * time.Second): @@ -227,9 +371,10 @@ func TestMCRouteControllerAsGateway(t *testing.T) { } func TestMCRouteControllerAsRegularNode(t *testing.T) { - c, closeFn := newMCRouteController(t, &config.NodeConfig{Name: "node-3"}) + c, closeFn := newMCRouteController(t, &config.NodeConfig{Name: "node-3"}, agent.MulticlusterWireGuardConfig{Enable: false}, nil) defer closeFn() - defer c.queue.ShutDown() + defer c.gatewayQueue.ShutDown() + defer c.ciImpQueue.ShutDown() stopCh := make(chan struct{}) defer close(stopCh) @@ -246,20 +391,20 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Create(context.TODO(), &gateway1, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) - c.processNextWorkItem() + c.processNextGatewayItem() // Create two ClusterInfoImports c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport1.GetNamespace()). Create(context.TODO(), &clusterInfoImport1, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP1, true).Times(1) - c.processNextWorkItem() + c.processNextClusterInfoImportItem() c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()). Create(context.TODO(), &clusterInfoImport2, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport2.Name, gomock.Any(), peerNodeIP1, true).Times(1) - c.processNextWorkItem() + c.processNextClusterInfoImportItem() // Update a ClusterInfoImport clusterInfoImport1.Spec.ServiceCIDR = "192.12.1.0/24" @@ -267,20 +412,20 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { Update(context.TODO(), &clusterInfoImport1, metav1.UpdateOptions{}) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP1, true).Times(1) - c.processNextWorkItem() + c.processNextClusterInfoImportItem() // Delete a ClusterInfoImport c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()).Delete(context.TODO(), clusterInfoImport2.Name, metav1.DeleteOptions{}) c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1) - c.processNextWorkItem() + c.processNextClusterInfoImportItem() // Update Gateway1's GatewayIP updatedGateway1a := gateway1.DeepCopy() updatedGateway1a.GatewayIP = "10.16.0.100" c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1a.GetNamespace()).Update(context.TODO(), updatedGateway1a, metav1.UpdateOptions{}) - c.processNextWorkItem() + c.processNextGatewayItem() // Update Gateway1's InternalIP updatedGateway1b := updatedGateway1a.DeepCopy() @@ -290,7 +435,7 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { updatedGateway1b, metav1.UpdateOptions{}) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), updatedGateway1bIP, true).Times(1) - c.processNextWorkItem() + c.processNextGatewayItem() // Create Gateway2 as the active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), @@ -298,7 +443,7 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP2, true).Times(1) c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.processNextWorkItem() + c.processNextGatewayItem() // Delete Gateway2, then Gateway1 become active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Delete(context.TODO(), @@ -307,13 +452,13 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), updatedGateway1bIP, true).Times(1) - c.processNextWorkItem() + c.processNextGatewayItem() // Delete last Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), gateway1.Name, metav1.DeleteOptions{}) c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.processNextWorkItem() + c.processNextGatewayItem() }() select { case <-time.After(5 * time.Second): diff --git a/pkg/agent/openflow/multicluster.go b/pkg/agent/openflow/multicluster.go index 7761836ef43..c7b71b84a96 100644 --- a/pkg/agent/openflow/multicluster.go +++ b/pkg/agent/openflow/multicluster.go @@ -82,6 +82,7 @@ func (f *featureMulticluster) l3FwdFlowToRemoteViaTun( Cookie(cookieID). MatchProtocol(ipProtocol). MatchDstIPNet(peerServiceCIDR). + Action().LoadPktMarkRange(0x4d2, multiclusterPktMarkRange). Action().SetSrcMAC(localGatewayMAC). // Rewrite src MAC to local gateway MAC. Action().SetDstMAC(GlobalVirtualMACForMulticluster). // Rewrite dst MAC to virtual MC MAC. Action().SetTunnelDst(tunnelPeer). // Flow based tunnel. Set tunnel destination. diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 50620b28bfb..af2b1547f0b 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -377,7 +377,8 @@ var DispositionToString = map[uint32]string{ var ( // snatPktMarkRange takes an 8-bit range of pkt_mark to store the ID of // a SNAT IP. The bit range must match SNATIPMarkMask. - snatPktMarkRange = &binding.Range{0, 7} + snatPktMarkRange = &binding.Range{0, 7} + multiclusterPktMarkRange = &binding.Range{0, 31} GlobalVirtualMAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff") ) diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index f0e1fc04a57..6a9cafe2bff 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -17,6 +17,8 @@ package route import ( "net" + "github.com/vishvananda/netlink" + "antrea.io/antrea/pkg/agent/config" binding "antrea.io/antrea/pkg/ovs/openflow" ) @@ -80,4 +82,23 @@ type Interface interface { // DeleteLocalAntreaFlexibleIPAMPodRule is used to delete related IP set entries when an AntreaFlexibleIPAM Pod is deleted. DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error + + // AddMulticlusterWireGuardRule is used to add ip rule for Multi-cluster WireGuard tunnel. + AddMulticlusterWireGuardRule(rule *netlink.Rule) error + + // DeleteMulticlusterWireGuardRule is used to delete ip rule Multi-cluster WireGuard tunnel when the Multi-cluster Gateway + // is deleted. + DeleteMulticlusterWireGuardRule(rule *netlink.Rule) error + + // AddMulticlusterRoute is used to add static route on host for WireGuard tunnel. + AddMulticlusterRoute(route *netlink.Route) error + + // DeleteMulticlusterRoute is used to delete static route on host for WireGuard tunnel. + DeleteMulticlusterRoute(route *netlink.Route) error + + // DeleteLink is used to delete Multi-cluster WireGuard interface. + DeleteLink(link netlink.Link) error + + // LinkByName is used to get link by name. + LinkByName(name string) (netlink.Link, error) } diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 734286c76f5..d68996d419c 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -1602,6 +1602,30 @@ func (c *Client) deleteNodeIP(podCIDR *net.IPNet) error { return nil } +func (c *Client) AddMulticlusterWireGuardRule(rule *netlink.Rule) error { + return c.netlink.RuleAdd(rule) +} + +func (c *Client) DeleteMulticlusterWireGuardRule(rule *netlink.Rule) error { + return c.netlink.RuleDel(rule) +} + +func (c *Client) AddMulticlusterRoute(route *netlink.Route) error { + return c.netlink.RouteReplace(route) +} + +func (c *Client) DeleteMulticlusterRoute(route *netlink.Route) error { + return c.netlink.RouteDel(route) +} + +func (c *Client) DeleteLink(link netlink.Link) error { + return c.netlink.LinkDel(link) +} + +func (c *Client) LinkByName(name string) (netlink.Link, error) { + return c.netlink.LinkByName(name) +} + func getTransProtocolStr(protocol binding.Protocol) string { if protocol == binding.ProtocolTCP || protocol == binding.ProtocolTCPv6 { return "tcp" diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 67ee9270b63..e1d666b5eb1 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -1,4 +1,4 @@ -// Copyright 2021 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import ( config "antrea.io/antrea/pkg/agent/config" openflow "antrea.io/antrea/pkg/ovs/openflow" gomock "github.com/golang/mock/gomock" + netlink "github.com/vishvananda/netlink" net "net" reflect "reflect" ) @@ -92,6 +93,34 @@ func (mr *MockInterfaceMockRecorder) AddLocalAntreaFlexibleIPAMPodRule(arg0 inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLocalAntreaFlexibleIPAMPodRule", reflect.TypeOf((*MockInterface)(nil).AddLocalAntreaFlexibleIPAMPodRule), arg0) } +// AddMulticlusterRoute mocks base method +func (m *MockInterface) AddMulticlusterRoute(arg0 *netlink.Route) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddMulticlusterRoute", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddMulticlusterRoute indicates an expected call of AddMulticlusterRoute +func (mr *MockInterfaceMockRecorder) AddMulticlusterRoute(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddMulticlusterRoute", reflect.TypeOf((*MockInterface)(nil).AddMulticlusterRoute), arg0) +} + +// AddMulticlusterWireGuardRule mocks base method +func (m *MockInterface) AddMulticlusterWireGuardRule(arg0 *netlink.Rule) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddMulticlusterWireGuardRule", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddMulticlusterWireGuardRule indicates an expected call of AddMulticlusterWireGuardRule +func (mr *MockInterfaceMockRecorder) AddMulticlusterWireGuardRule(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddMulticlusterWireGuardRule", reflect.TypeOf((*MockInterface)(nil).AddMulticlusterWireGuardRule), arg0) +} + // AddNodePort mocks base method func (m *MockInterface) AddNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() @@ -148,6 +177,20 @@ func (mr *MockInterfaceMockRecorder) DeleteClusterIPRoute(arg0 interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteClusterIPRoute", reflect.TypeOf((*MockInterface)(nil).DeleteClusterIPRoute), arg0) } +// DeleteLink mocks base method +func (m *MockInterface) DeleteLink(arg0 netlink.Link) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteLink", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteLink indicates an expected call of DeleteLink +func (mr *MockInterfaceMockRecorder) DeleteLink(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteLink", reflect.TypeOf((*MockInterface)(nil).DeleteLink), arg0) +} + // DeleteLoadBalancer mocks base method func (m *MockInterface) DeleteLoadBalancer(arg0 []string) error { m.ctrl.T.Helper() @@ -176,6 +219,34 @@ func (mr *MockInterfaceMockRecorder) DeleteLocalAntreaFlexibleIPAMPodRule(arg0 i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteLocalAntreaFlexibleIPAMPodRule", reflect.TypeOf((*MockInterface)(nil).DeleteLocalAntreaFlexibleIPAMPodRule), arg0) } +// DeleteMulticlusterRoute mocks base method +func (m *MockInterface) DeleteMulticlusterRoute(arg0 *netlink.Route) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteMulticlusterRoute", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteMulticlusterRoute indicates an expected call of DeleteMulticlusterRoute +func (mr *MockInterfaceMockRecorder) DeleteMulticlusterRoute(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMulticlusterRoute", reflect.TypeOf((*MockInterface)(nil).DeleteMulticlusterRoute), arg0) +} + +// DeleteMulticlusterWireGuardRule mocks base method +func (m *MockInterface) DeleteMulticlusterWireGuardRule(arg0 *netlink.Rule) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteMulticlusterWireGuardRule", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteMulticlusterWireGuardRule indicates an expected call of DeleteMulticlusterWireGuardRule +func (mr *MockInterfaceMockRecorder) DeleteMulticlusterWireGuardRule(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMulticlusterWireGuardRule", reflect.TypeOf((*MockInterface)(nil).DeleteMulticlusterWireGuardRule), arg0) +} + // DeleteNodePort mocks base method func (m *MockInterface) DeleteNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() @@ -232,6 +303,21 @@ func (mr *MockInterfaceMockRecorder) Initialize(arg0, arg1 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Initialize", reflect.TypeOf((*MockInterface)(nil).Initialize), arg0, arg1) } +// LinkByName mocks base method +func (m *MockInterface) LinkByName(arg0 string) (netlink.Link, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LinkByName", arg0) + ret0, _ := ret[0].(netlink.Link) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LinkByName indicates an expected call of LinkByName +func (mr *MockInterfaceMockRecorder) LinkByName(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LinkByName", reflect.TypeOf((*MockInterface)(nil).LinkByName), arg0) +} + // MigrateRoutesToGw mocks base method func (m *MockInterface) MigrateRoutesToGw(arg0 string) error { m.ctrl.T.Helper() diff --git a/pkg/agent/types/annotations.go b/pkg/agent/types/annotations.go index de340332544..db850140b3a 100644 --- a/pkg/agent/types/annotations.go +++ b/pkg/agent/types/annotations.go @@ -26,4 +26,8 @@ const ( // ServiceExternalIPPoolAnnotationKey is the key of the Service annotation that specifies the Service's desired external IP pool. ServiceExternalIPPoolAnnotationKey string = "service.antrea.io/external-ip-pool" + + MulticlusterWireGuardPublicAnnotationKey string = "multicluster.antrea.io/wireguard-public-key" + + MulticlusterWireGuardAnnotationPort string = "multicluster.antrea.io/wireguard-port" ) diff --git a/pkg/agent/util/netlink/netlink_linux.go b/pkg/agent/util/netlink/netlink_linux.go index 9db8047fda1..5338b6f9bca 100644 --- a/pkg/agent/util/netlink/netlink_linux.go +++ b/pkg/agent/util/netlink/netlink_linux.go @@ -38,6 +38,10 @@ type Interface interface { NeighDel(neigh *netlink.Neigh) error + RuleAdd(rule *netlink.Rule) error + + RuleDel(rule *netlink.Rule) error + LinkByName(name string) (netlink.Link, error) LinkSetNsFd(link netlink.Link, fd int) error @@ -45,4 +49,6 @@ type Interface interface { LinkSetMTU(link netlink.Link, mtu int) error LinkSetUp(link netlink.Link) error + + LinkDel(link netlink.Link) error } diff --git a/pkg/agent/util/netlink/testing/mock_netlink_linux.go b/pkg/agent/util/netlink/testing/mock_netlink_linux.go index 7931125e2dc..e2b8c16f5c2 100644 --- a/pkg/agent/util/netlink/testing/mock_netlink_linux.go +++ b/pkg/agent/util/netlink/testing/mock_netlink_linux.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -106,6 +106,20 @@ func (mr *MockInterfaceMockRecorder) LinkByName(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LinkByName", reflect.TypeOf((*MockInterface)(nil).LinkByName), arg0) } +// LinkDel mocks base method +func (m *MockInterface) LinkDel(arg0 netlink.Link) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LinkDel", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// LinkDel indicates an expected call of LinkDel +func (mr *MockInterfaceMockRecorder) LinkDel(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LinkDel", reflect.TypeOf((*MockInterface)(nil).LinkDel), arg0) +} + // LinkSetMTU mocks base method func (m *MockInterface) LinkSetMTU(arg0 netlink.Link, arg1 int) error { m.ctrl.T.Helper() @@ -248,3 +262,31 @@ func (mr *MockInterfaceMockRecorder) RouteReplace(arg0 interface{}) *gomock.Call mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RouteReplace", reflect.TypeOf((*MockInterface)(nil).RouteReplace), arg0) } + +// RuleAdd mocks base method +func (m *MockInterface) RuleAdd(arg0 *netlink.Rule) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RuleAdd", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RuleAdd indicates an expected call of RuleAdd +func (mr *MockInterfaceMockRecorder) RuleAdd(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RuleAdd", reflect.TypeOf((*MockInterface)(nil).RuleAdd), arg0) +} + +// RuleDel mocks base method +func (m *MockInterface) RuleDel(arg0 *netlink.Rule) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RuleDel", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RuleDel indicates an expected call of RuleDel +func (mr *MockInterfaceMockRecorder) RuleDel(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RuleDel", reflect.TypeOf((*MockInterface)(nil).RuleDel), arg0) +} diff --git a/pkg/agent/wireguard/client_linux.go b/pkg/agent/wireguard/client_linux.go index 8054d810731..719504e6684 100644 --- a/pkg/agent/wireguard/client_linux.go +++ b/pkg/agent/wireguard/client_linux.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/util/retry" "k8s.io/klog/v2" + mcClientset "antrea.io/antrea/multicluster/pkg/client/clientset/versioned" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" @@ -66,13 +67,15 @@ type client struct { wgClient wgctrlClient nodeName string k8sClient clientset.Interface + mcClient mcClientset.Interface + mcNamespace string privateKey wgtypes.Key peerPublicKeyByNodeName *sync.Map wireGuardConfig *config.WireGuardConfig gatewayConfig *config.GatewayConfig } -func New(clientSet clientset.Interface, nodeConfig *config.NodeConfig, wireGuardConfig *config.WireGuardConfig) (Interface, error) { +func New(clientSet clientset.Interface, mcClientSet mcClientset.Interface, nodeConfig *config.NodeConfig, wireGuardConfig *config.WireGuardConfig, mcNamespace string) (Interface, error) { wgClient, err := wgctrl.New() if err != nil { return nil, err @@ -84,6 +87,8 @@ func New(clientSet clientset.Interface, nodeConfig *config.NodeConfig, wireGuard wgClient: wgClient, nodeName: nodeConfig.Name, k8sClient: clientSet, + mcClient: mcClientSet, + mcNamespace: mcNamespace, wireGuardConfig: wireGuardConfig, peerPublicKeyByNodeName: &sync.Map{}, gatewayConfig: nodeConfig.GatewayConfig, @@ -91,7 +96,7 @@ func New(clientSet clientset.Interface, nodeConfig *config.NodeConfig, wireGuard return c, nil } -func (client *client) Init() error { +func (client *client) Init(multiCluster bool) error { link := &netlink.Wireguard{LinkAttrs: netlink.LinkAttrs{Name: client.wireGuardConfig.Name, MTU: client.wireGuardConfig.MTU}} err := linkAdd(link) // Ignore existing link as it may have already been created or managed by userspace process. @@ -145,18 +150,36 @@ func (client *client) Init() error { ListenPort: &client.wireGuardConfig.Port, ReplacePeers: false, } - patch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "annotations": map[string]string{ - types.NodeWireGuardPublicAnnotationKey: client.privateKey.PublicKey().String(), + if !multiCluster { + patch, _ := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]string{ + types.NodeWireGuardPublicAnnotationKey: client.privateKey.PublicKey().String(), + }, }, - }, - }) - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - _, err := client.k8sClient.CoreV1().Nodes().Patch(context.TODO(), client.nodeName, apitypes.MergePatchType, patch, metav1.PatchOptions{}, "status") - return err - }); err != nil { - return fmt.Errorf("error when patching the Node with the '%s' annotation: %w", types.NodeWireGuardPublicAnnotationKey, err) + }) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + _, err := client.k8sClient.CoreV1().Nodes().Patch(context.TODO(), client.nodeName, apitypes.MergePatchType, patch, metav1.PatchOptions{}, "status") + return err + }); err != nil { + return fmt.Errorf("error when patching the Node with the '%s' annotation: %w", types.NodeWireGuardPublicAnnotationKey, err) + } + } else { + patch, _ := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]string{ + types.MulticlusterWireGuardPublicAnnotationKey: client.privateKey.PublicKey().String(), + types.MulticlusterWireGuardAnnotationPort: strconv.Itoa(client.wireGuardConfig.Port), + }, + }, + }) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + _, err := client.mcClient.MulticlusterV1alpha1().Gateways(client.mcNamespace).Patch(context.TODO(), client.nodeName, apitypes.MergePatchType, patch, + metav1.PatchOptions{}) + return err + }); err != nil { + return fmt.Errorf("error when patching the Gateway with the '%s' annotation: %w", types.NodeWireGuardPublicAnnotationKey, err) + } } return client.wgClient.ConfigureDevice(client.wireGuardConfig.Name, cfg) @@ -193,7 +216,7 @@ func (client *client) RemoveStalePeers(currentPeerPublickeys map[string]string) return nil } -func (client *client) UpdatePeer(nodeName, publicKeyString string, peerNodeIP net.IP, podCIDRs []*net.IPNet) error { +func (client *client) UpdatePeer(nodeName, publicKeyString string, peerNodeIP net.IP, peerNodePort int, podCIDRs []*net.IPNet) error { pubKey, err := wgtypes.ParseKey(publicKeyString) if err != nil { return err @@ -218,7 +241,10 @@ func (client *client) UpdatePeer(nodeName, publicKeyString string, peerNodeIP ne } } } - endpoint := net.JoinHostPort(peerNodeIP.String(), strconv.Itoa(client.wireGuardConfig.Port)) + if peerNodePort == 0 { + peerNodePort = client.wireGuardConfig.Port + } + endpoint := net.JoinHostPort(peerNodeIP.String(), strconv.Itoa(peerNodePort)) endpointUDP, err := net.ResolveUDPAddr("udp", endpoint) if err != nil { return err diff --git a/pkg/agent/wireguard/client_test.go b/pkg/agent/wireguard/client_test.go index 9627467aa25..6285536fc29 100644 --- a/pkg/agent/wireguard/client_test.go +++ b/pkg/agent/wireguard/client_test.go @@ -343,7 +343,7 @@ func Test_UpdatePeer(t *testing.T) { fc.peers[ec.PublicKey] = ec } client.wgClient = fc - err := client.UpdatePeer(tt.inputPeerNodeName, tt.inputPeerNodePublicKey, tt.inputPeerNodeIP, tt.inputPodCIDRs) + err := client.UpdatePeer(tt.inputPeerNodeName, tt.inputPeerNodePublicKey, tt.inputPeerNodeIP, 0, tt.inputPodCIDRs) if tt.expectedError { require.Error(t, err) } else { @@ -363,7 +363,7 @@ func Test_DeletePeer(t *testing.T) { pk1, _ := wgtypes.GeneratePrivateKey() ip1, _, _ := net.ParseCIDR("10.20.30.42/32") t.Run("delete non-existing peer", func(tt *testing.T) { - err := client.UpdatePeer("fake-node-1", pk1.String(), ip1, nil) + err := client.UpdatePeer("fake-node-1", pk1.String(), ip1, 0, nil) require.NoError(tt, err) assert.Len(tt, fc.peers, 1) _, ok := client.peerPublicKeyByNodeName.Load("fake-node-1") @@ -386,7 +386,7 @@ func Test_DeletePeer(t *testing.T) { func Test_New(t *testing.T) { client := fake.NewSimpleClientset() - _, err := New(client, &config.NodeConfig{Name: "test"}, &config.WireGuardConfig{}) + _, err := New(client, nil, &config.NodeConfig{Name: "test"}, &config.WireGuardConfig{}, "") require.NoError(t, err) } @@ -446,7 +446,7 @@ func Test_Init(t *testing.T) { return tt.utilConfigErr } - err := client.Init() + err := client.Init(false) if tt.expectedErr != "" { assert.Equal(t, tt.expectedErr, err.Error()) } else { diff --git a/pkg/agent/wireguard/interface.go b/pkg/agent/wireguard/interface.go index 4b11b33709c..08ac2f10e31 100644 --- a/pkg/agent/wireguard/interface.go +++ b/pkg/agent/wireguard/interface.go @@ -20,11 +20,13 @@ import ( type Interface interface { // Init initializes the WireGuard client and sets up the WireGuard device. - // It will generate a new private key if necessary and update the public key to the Node's annotation. - Init() error + // It will generate a new private key if necessary. + // When "mc" is false, it will update the public key to the Node's annotation. + // Or it will update the public key and port to Multi-cluster Gateway's annotation. + Init(mc bool) error // UpdatePeer updates WireGuard peer by provided public key and Node IPs. // It will create a new WireGuard peer if the specified Node is not present in WireGuard device. - UpdatePeer(nodeName, publicKeyString string, peerNodeIP net.IP, podCIDRs []*net.IPNet) error + UpdatePeer(nodeName, publicKeyString string, peerNodeIP net.IP, peerNodePort int, allowedIPs []*net.IPNet) error // RemoveStalePeers reads existing WireGuard peers from the WireGuard device and deletes those which are not in currentPeerPublickeys. // currentPeerPublickeys is a map of Node names to public keys. It is useful to clean up stale WireGuard peers upon antrea starting. RemoveStalePeers(currentPeerPublickeys map[string]string) error diff --git a/pkg/agent/wireguard/testing/mock_wireguard.go b/pkg/agent/wireguard/testing/mock_wireguard.go new file mode 100644 index 00000000000..4c2951a293c --- /dev/null +++ b/pkg/agent/wireguard/testing/mock_wireguard.go @@ -0,0 +1,36 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package testing + +import "net" + +type MockWireGuardClient struct{} + +func (m *MockWireGuardClient) Init(mc bool) error { + return nil +} + +func (m *MockWireGuardClient) UpdatePeer(nodeName, publicKeyString string, peerNodeIP net.IP, peerNodePort int, allowedIPs []*net.IPNet) error { + return nil +} + +func (m *MockWireGuardClient) RemoveStalePeers(currentPeerPublickeys map[string]string) error { + return nil +} + +func (m *MockWireGuardClient) DeletePeer(nodeName string) error { + return nil +} diff --git a/pkg/config/agent/config.go b/pkg/config/agent/config.go index 03d586eac38..ee91b877d85 100644 --- a/pkg/config/agent/config.go +++ b/pkg/config/agent/config.go @@ -284,6 +284,15 @@ type IPsecConfig struct { AuthenticationMode string `yaml:"authenticationMode,omitempty"` } +type MulticlusterWireGuardConfig struct { + // Enable Multi-cluster WireGuard. WireGuard tunnels will be established between + // member clusters in one ClusterSet. All cross-cluster traffic will go through + // the WireGuard tunnels. + Enable bool `yaml:"enable"` + // The port for the WireGuard to receive traffic. Defaults to 51821. + Port int `yaml:"port"` +} + type MulticlusterConfig struct { // Deprecated and replaced by "enableGateway". Keep the field in MulticlusterConfig to be // compatible with earlier version (<= v1.10) Antrea deployment manifests. @@ -300,6 +309,8 @@ type MulticlusterConfig struct { // clusters directly. This feature also requires Pod CIDRs to be provided in the Multi-cluster Controller // configuration. EnablePodToPodConnectivity bool `yaml:"enablePodToPodConnectivity,omitempty"` + // Antrea Multi-cluster WireGuard tunnel configuration. + WireGuard MulticlusterWireGuardConfig `yaml:"wireGuard,omitempty"` } type ExternalNodeConfig struct {