From 6ba5320c06682933f08edc6952f3d167482b8375 Mon Sep 17 00:00:00 2001 From: Hongliang Liu <75655411+hongliangl@users.noreply.github.com> Date: Fri, 19 Jul 2024 04:44:54 +0800 Subject: [PATCH] Implement the controller for API BGPPolicy This commit implements the controller of API `BGPPolicy`, designed to advertise Service IPs, Egress IPs, and Pod IPs to BGP peers from selected Kubernetes Nodes. According to the spec of `BGPPolicy`, the Node selector is used to select Nodes to which a `BGPPolicy` is applied. Multiple `BGPPolicies` can be applied to the same Node. However, only the oldest `BGPPolicy` will be effective on a Node, with others serving as alternatives. The effective one may be changed in the following cases: - The current effective BGPPolicy is updated and not applied to the Node. - The current effective BGPPolicy is deleted. The BGP server instance is only created and started for the effective BGPPolicy on a Node. If the effective BGPPolicy is changed, the corresponding BGP server instance will be terminated by calling the `Stop` method, and a new BGP server instance will be created and started by calling the `Start` method for the new effective BGPPolicy. To create a BGP server instance, ASN, router ID, and listen port must be specified. The ASN and listen port are specified in the spec of the effective BGPPolicy. For router ID, if the Kubernetes cluster is IPv4-only or dual-stack, we use the Node's IPv4 address as the router ID, ensuring uniqueness. If the Kubernetes cluster is IPv6-only, where no Node IPv4 address is available, the router ID could be specified via the Node annotation `node.antrea.io/bgp-router-id`. If not present, a router ID will be generated by hashing the Node name and update it to the Node annotation `node.antrea.io/bgp-router-id`. Additionally, the stale BGP server instance will be terminated and a new BGP server instance should be created and started when any of ASN, routerID, or listen port changes. The information of the BGP peers is specified in the effective BGPPolicy. The unique identification of a BGP peer is the peer IP address and peer ASN. To reconcile the latest BGP peers: - Get the BGP peers to be added and add them by calling the `AddPeer` method of the BGP server instance. - Get the BGP peers to be deleted and delete them by calling the `RemovePeer` method of the BGP server instance. - Get the remaining BGP peers and calculate the updated BGP peers, then update them by calling the `UpdatePeer` method of the BGP server instance. The information of the IPs to be advertised can be calculated from the spec of the effective BGPPolicy. Currently, we advertise the IPs and CIDRs to all the BGP peers. To reconcile the latest IPs to all BGP peers: - If the BGP server instance is newly created and started, advertise all the IPs by calling the `AdvertiseRoutes` method. - If the BGP server instance is not newly created and started: - Get the IPs/CIDRs to be added and advertise them by calling the `AdvertiseRoutes` method. - Get the IPs/CIDRs to be removed and withdraw them by calling the `WithdrawRoutes` method. The feature is gated by the alpha `BGPPolicy` feature gate and only supported in Linux. Signed-off-by: Hongliang Liu --- build/charts/antrea/conf/antrea-agent.conf | 4 + .../antrea/templates/agent/clusterrole.yaml | 10 + build/yamls/antrea-aks.yml | 18 +- build/yamls/antrea-eks.yml | 18 +- build/yamls/antrea-gke.yml | 18 +- build/yamls/antrea-ipsec.yml | 18 +- build/yamls/antrea.yml | 18 +- cmd/antrea-agent/agent.go | 18 + docs/feature-gates.md | 11 + pkg/agent/controller/bgp/controller.go | 924 ++++++++ pkg/agent/controller/bgp/controller_test.go | 2043 +++++++++++++++++ pkg/agent/types/annotations.go | 3 + pkg/agent/types/bgppolicy.go | 23 + .../handlers/featuregates/handler_test.go | 1 + pkg/features/antrea_features.go | 14 +- 15 files changed, 3129 insertions(+), 12 deletions(-) create mode 100644 pkg/agent/controller/bgp/controller.go create mode 100644 pkg/agent/controller/bgp/controller_test.go create mode 100644 pkg/agent/types/bgppolicy.go diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 24eaf7df46b..5d10f890134 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -91,6 +91,10 @@ featureGates: # Enable NodeLatencyMonitor to monitor the latency between Nodes. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "NodeLatencyMonitor" "default" false) }} +# Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to +# remote BGP peers. +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "BGPPolicy" "default" false) }} + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index ef1c43e40e8..be46ac81f8b 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -177,6 +177,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -234,3 +235,12 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 64564b55682..f4d14c97112 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,15 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5124,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 71bf05ff579aa9bea7b360669c5e2ce2830ca88dc4ab54480638ce006eaeaf11 + checksum/config: cce7d6644fb552607ebeda9bf30a5fafa871dd4382afc609500fcb493b61768c labels: app: antrea component: antrea-agent @@ -5348,7 +5362,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 71bf05ff579aa9bea7b360669c5e2ce2830ca88dc4ab54480638ce006eaeaf11 + checksum/config: cce7d6644fb552607ebeda9bf30a5fafa871dd4382afc609500fcb493b61768c labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 4cf276c9961..66996e408cc 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,15 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5124,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 71bf05ff579aa9bea7b360669c5e2ce2830ca88dc4ab54480638ce006eaeaf11 + checksum/config: cce7d6644fb552607ebeda9bf30a5fafa871dd4382afc609500fcb493b61768c labels: app: antrea component: antrea-agent @@ -5349,7 +5363,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 71bf05ff579aa9bea7b360669c5e2ce2830ca88dc4ab54480638ce006eaeaf11 + checksum/config: cce7d6644fb552607ebeda9bf30a5fafa871dd4382afc609500fcb493b61768c labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 3e37bf206f5..d417c790481 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,15 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5124,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 91ff2b609519e4aaead6ab850252a49bbe674dec17f6f239c4d0fa6c7b5705f6 + checksum/config: e30c52c9fcb04d362d018e846cf72dc633c5e891e02b3ebb87fab4d7ee08e15a labels: app: antrea component: antrea-agent @@ -5346,7 +5360,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 91ff2b609519e4aaead6ab850252a49bbe674dec17f6f239c4d0fa6c7b5705f6 + checksum/config: e30c52c9fcb04d362d018e846cf72dc633c5e891e02b3ebb87fab4d7ee08e15a labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index c17048997b1..de2bd229103 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3820,6 +3820,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4458,6 +4462,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4515,6 +4520,15 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5123,7 +5137,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 2d75956786eb552eaba94f89dfa5c6bab570bf662b82449e9af31a57ca138750 + checksum/config: 73a49a9a8508cc8fb94eb2c770bb3589e68d9623327231943cba60a48716568a checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -5405,7 +5419,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 2d75956786eb552eaba94f89dfa5c6bab570bf662b82449e9af31a57ca138750 + checksum/config: 73a49a9a8508cc8fb94eb2c770bb3589e68d9623327231943cba60a48716568a labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 97fd9640a66..ddc7ce59a80 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,15 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5124,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ebc0be79b0fc65db51609f5c9185ca8a0533e265811d14c687f577cf93497a58 + checksum/config: 20130c4a5dbfeec75182bc3053288f64c06d0350b34c86675ac88d5961c47853 labels: app: antrea component: antrea-agent @@ -5346,7 +5360,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ebc0be79b0fc65db51609f5c9185ca8a0533e265811d14c687f577cf93497a58 + checksum/config: 20130c4a5dbfeec75182bc3053288f64c06d0350b34c86675ac88d5961c47853 labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 4f7099b6ae0..5bd503571f1 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -38,6 +38,7 @@ import ( "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/controller/bgp" "antrea.io/antrea/pkg/agent/controller/egress" "antrea.io/antrea/pkg/agent/controller/ipseccertificate" "antrea.io/antrea/pkg/agent/controller/l7flowexporter" @@ -743,6 +744,23 @@ func run(o *Options) error { } } + if features.DefaultFeatureGate.Enabled(features.BGPPolicy) { + bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies() + bgpController, err := bgp.NewBGPPolicyController(nodeInformer, + serviceInformer, + egressInformer, + bgpPolicyInformer, + endpointSliceInformer, + o.enableEgress, + k8sClient, + nodeConfig, + networkConfig) + if err != nil { + return err + } + go bgpController.Run(ctx) + } + if features.DefaultFeatureGate.Enabled(features.TrafficControl) { tcController := trafficcontrol.NewTrafficControlController(ofClient, ifaceStore, diff --git a/docs/feature-gates.md b/docs/feature-gates.md index 8efb4388ed9..5f130c3f39b 100644 --- a/docs/feature-gates.md +++ b/docs/feature-gates.md @@ -59,6 +59,7 @@ edit the Agent configuration in the | `EgressSeparateSubnet` | Agent | `false` | Alpha | v1.15 | N/A | N/A | No | | | `NodeNetworkPolicy` | Agent | `false` | Alpha | v1.15 | N/A | N/A | Yes | | | `L7FlowExporter` | Agent | `false` | Alpha | v1.15 | N/A | N/A | Yes | | +| `BGPPolicy` | Agent | `false` | Alpha | v2.1 | N/A | N/A | No | | ## Description and Requirements of Features @@ -435,3 +436,13 @@ Refer to this [document](network-flow-visibility.md#l7-visibility) for more info #### Requirements for this Feature - Linux Nodes only. + +### BGPPolicy + +`BGPPolicy` allows users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs (e.g., +ClusterIPs, ExternalIPs, LoadBalancerIPs), Pod IPs and Egress IPs to remote BGP peers, providing a flexible mechanism +for integrating Kubernetes clusters with external BGP-enabled networks. + +#### Requirements for this Feature + +- Linux Nodes only. diff --git a/pkg/agent/controller/bgp/controller.go b/pkg/agent/controller/bgp/controller.go new file mode 100644 index 00000000000..4eb2a44fefb --- /dev/null +++ b/pkg/agent/controller/bgp/controller.go @@ -0,0 +1,924 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bgp + +import ( + "context" + "encoding/json" + "fmt" + "hash/fnv" + "net" + "reflect" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + apitypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + discoveryinformers "k8s.io/client-go/informers/discovery/v1" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + discoverylisters "k8s.io/client-go/listers/discovery/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + utilnet "k8s.io/utils/net" + "k8s.io/utils/ptr" + "k8s.io/utils/strings/slices" + + "antrea.io/antrea/pkg/agent/bgp" + "antrea.io/antrea/pkg/agent/bgp/gobgp" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + "antrea.io/antrea/pkg/apis/crd/v1beta1" + crdinformersv1a1 "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" + crdinformersv1b1 "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1beta1" + crdlistersv1a1 "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + crdlistersv1b1 "antrea.io/antrea/pkg/client/listers/crd/v1beta1" + "antrea.io/antrea/pkg/util/env" +) + +const ( + controllerName = "BGPPolicyController" + // How long to wait before retrying the processing of a BGPPolicy change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Disable resyncing. + resyncPeriod time.Duration = 0 +) + +const ( + ipv4Suffix = "/32" + ipv6Suffix = "/128" +) + +const dummyKey = "dummyKey" + +type bgpPolicyState struct { + // The local BGP server. + bgpServer bgp.Interface + // The port on which the local BGP server listens. + listenPort int32 + // The AS number used by the local BGP server. + localASN int32 + // The router ID used by the local BGP server. + routerID string + // routes stores all BGP routers advertised to BGP peers. + routes sets.Set[bgp.Route] + // peerConfigs is a map that stores configurations of BGP peers. The map keys are the concatenated strings of BGP + // peer IP address and ASN (e.g., "192.168.77.100-65000", "2001::1-65000"). + peerConfigs map[string]bgp.PeerConfig +} + +type Controller struct { + nodeInformer cache.SharedIndexInformer + nodeLister corelisters.NodeLister + nodeListerSynced cache.InformerSynced + + serviceInformer cache.SharedIndexInformer + serviceLister corelisters.ServiceLister + serviceListerSynced cache.InformerSynced + + egressInformer cache.SharedIndexInformer + egressLister crdlistersv1b1.EgressLister + egressListerSynced cache.InformerSynced + + bgpPolicyInformer cache.SharedIndexInformer + bgpPolicyLister crdlistersv1a1.BGPPolicyLister + bgpPolicyListerSynced cache.InformerSynced + + endpointSliceInformer cache.SharedIndexInformer + endpointSliceLister discoverylisters.EndpointSliceLister + endpointSliceListerSynced cache.InformerSynced + + secretInformer cache.SharedIndexInformer + + bgpPolicyState *bgpPolicyState + + k8sClient kubernetes.Interface + bgpPeerPasswords map[string]string + bgpPeerPasswordsMutex sync.RWMutex + + nodeName string + enabledIPv4 bool + enabledIPv6 bool + podIPv4CIDR string + podIPv6CIDR string + nodeIPv4Addr string + + egressEnabled bool + + newBGPServerFn func(globalConfig *bgp.GlobalConfig) bgp.Interface + + queue workqueue.RateLimitingInterface +} + +func NewBGPPolicyController(nodeInformer coreinformers.NodeInformer, + serviceInformer coreinformers.ServiceInformer, + egressInformer crdinformersv1b1.EgressInformer, + bgpPolicyInformer crdinformersv1a1.BGPPolicyInformer, + endpointSliceInformer discoveryinformers.EndpointSliceInformer, + egressEnabled bool, + k8sClient kubernetes.Interface, + nodeConfig *config.NodeConfig, + networkConfig *config.NetworkConfig) (*Controller, error) { + c := &Controller{ + nodeInformer: nodeInformer.Informer(), + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + serviceInformer: serviceInformer.Informer(), + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, + bgpPolicyInformer: bgpPolicyInformer.Informer(), + bgpPolicyLister: bgpPolicyInformer.Lister(), + bgpPolicyListerSynced: bgpPolicyInformer.Informer().HasSynced, + endpointSliceInformer: endpointSliceInformer.Informer(), + endpointSliceLister: endpointSliceInformer.Lister(), + endpointSliceListerSynced: endpointSliceInformer.Informer().HasSynced, + k8sClient: k8sClient, + bgpPeerPasswords: make(map[string]string), + nodeName: nodeConfig.Name, + enabledIPv4: networkConfig.IPv4Enabled, + enabledIPv6: networkConfig.IPv6Enabled, + podIPv4CIDR: nodeConfig.PodIPv4CIDR.String(), + podIPv6CIDR: nodeConfig.PodIPv6CIDR.String(), + nodeIPv4Addr: nodeConfig.NodeIPv4Addr.IP.String(), + egressEnabled: egressEnabled, + newBGPServerFn: func(globalConfig *bgp.GlobalConfig) bgp.Interface { + return gobgp.NewGoBGPServer(globalConfig) + }, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "bgpPolicy"), + } + c.bgpPolicyInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addBGPPolicy, + UpdateFunc: c.updateBGPPolicy, + DeleteFunc: c.deleteBGPPolicy, + }, + resyncPeriod, + ) + c.serviceInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addService, + UpdateFunc: c.updateService, + DeleteFunc: c.deleteService, + }, + resyncPeriod, + ) + c.endpointSliceInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addEndpointSlice, + UpdateFunc: c.updateEndpointSlice, + DeleteFunc: c.deleteEndpointSlice, + }, + resyncPeriod, + ) + if c.egressEnabled { + c.egressInformer = egressInformer.Informer() + c.egressLister = egressInformer.Lister() + c.egressListerSynced = egressInformer.Informer().HasSynced + c.egressInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addEgress, + UpdateFunc: c.updateEgress, + DeleteFunc: c.deleteEgress, + }, + resyncPeriod, + ) + } + c.nodeInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addNode, + UpdateFunc: c.updateNode, + DeleteFunc: nil, + }, + resyncPeriod, + ) + + c.secretInformer = coreinformers.NewFilteredSecretInformer(k8sClient, + env.GetAntreaNamespace(), + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", types.BGPPolicySecretName).String() + }) + c.secretInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addSecret, + UpdateFunc: c.updateSecret, + DeleteFunc: c.deleteSecret, + }) + + return c, nil +} + +func (c *Controller) Run(ctx context.Context) { + defer c.queue.ShutDown() + + klog.InfoS("Starting", "controllerName", controllerName) + defer klog.InfoS("Shutting down", "controllerName", controllerName) + + cacheSyncs := []cache.InformerSynced{ + c.nodeListerSynced, + c.serviceListerSynced, + c.bgpPolicyListerSynced, + c.endpointSliceListerSynced, + } + if c.egressEnabled { + cacheSyncs = append(cacheSyncs, c.egressListerSynced) + } + if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), cacheSyncs...) { + return + } + + go wait.UntilWithContext(ctx, c.worker, time.Second) + + <-ctx.Done() +} + +func (c *Controller) worker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *Controller) processNextWorkItem(ctx context.Context) bool { + _, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(dummyKey) + + if err := c.syncBGPPolicy(ctx); err == nil { + // If no error occurs we Forget this item, so it does not get queued again until another change happens. + c.queue.Forget(dummyKey) + } else { + // Put the item back on the work queue to handle any transient errors. + c.queue.AddRateLimited(dummyKey) + klog.ErrorS(err, "Syncing BGPPolicy failed, requeue") + } + return true +} + +func (c *Controller) getEffectiveBGPPolicy() *v1alpha1.BGPPolicy { + allPolicies, _ := c.bgpPolicyLister.List(labels.Everything()) + var oldestPolicy *v1alpha1.BGPPolicy + for _, policy := range allPolicies { + if c.matchesCurrentNode(policy) { + if oldestPolicy == nil || policy.CreationTimestamp.Before(&oldestPolicy.CreationTimestamp) { + oldestPolicy = policy + } + } + } + return oldestPolicy +} + +func (c *Controller) syncBGPPolicy(ctx context.Context) error { + ctx, cancel := context.WithTimeoutCause(ctx, 60*time.Second, fmt.Errorf("BGPPolicy took too long to sync")) + defer cancel() + + startTime := time.Now() + defer func() { + klog.InfoS("Finished syncing BGPPolicy", "durationTime", time.Since(startTime)) + }() + + // Get the oldest BGPPolicy applied to the current Node as the effective BGPPolicy. + effectivePolicy := c.getEffectiveBGPPolicy() + + // When the effective BGPPolicy is nil, it means that there is no available BGPPolicy. + if effectivePolicy == nil { + // If the BGPPolicy state is nil, just return. + if c.bgpPolicyState == nil { + return nil + } + + // If the BGPPolicy state is not nil, stop the BGP server and reset the state to nil, then return. + if err := c.bgpPolicyState.bgpServer.Stop(ctx); err != nil { + return err + } + c.bgpPolicyState = nil + return nil + } + + klog.V(2).InfoS("Syncing BGPPolicy", "BGPPolicy", klog.KObj(effectivePolicy)) + // Retrieve the listen port, local AS number and router ID from the effective BGPPolicy, and update them to the + // current state. + routerID, err := c.getRouterID() + if err != nil { + return err + } + listenPort := *effectivePolicy.Spec.ListenPort + localASN := effectivePolicy.Spec.LocalASN + + // If the BGPPolicy state is nil, a new BGP server should be started, initialize the BGPPolicy state to store the + // new BGP server, listen port, local ASN, and router ID. + // If the BGPPolicy is not nil, any of the listen port, local AS number, or router ID have changed, stop the current + // BGP server first and reset the BGPPolicy state to nil; then start a new BGP server and initialize the BGPPolicy + // state to store the new BGP server, listen port, local ASN, and router ID. + needUpdateBGPServer := c.bgpPolicyState == nil || + c.bgpPolicyState.listenPort != listenPort || + c.bgpPolicyState.localASN != localASN || + c.bgpPolicyState.routerID != routerID + + if needUpdateBGPServer { + if c.bgpPolicyState != nil { + // Stop the current BGP server. + if err := c.bgpPolicyState.bgpServer.Stop(ctx); err != nil { + return fmt.Errorf("failed to stop current BGP server: %w", err) + } + // Reset the BGPPolicy state to nil. + c.bgpPolicyState = nil + } + + // Create a new BGP server. + bgpServer := c.newBGPServerFn(&bgp.GlobalConfig{ + ASN: uint32(localASN), + RouterID: routerID, + ListenPort: listenPort, + }) + + // Start the new BGP server. + if err := bgpServer.Start(ctx); err != nil { + return fmt.Errorf("failed to start BGP server: %w", err) + } + + // Initialize the BGPPolicy state to store the new BGP server, listen port, local ASN, and router ID. + c.bgpPolicyState = &bgpPolicyState{ + bgpServer: bgpServer, + routerID: routerID, + listenPort: listenPort, + localASN: localASN, + routes: make(sets.Set[bgp.Route]), + peerConfigs: make(map[string]bgp.PeerConfig), + } + } + + // Reconcile BGP peers. + if err := c.reconcileBGPPeers(ctx, effectivePolicy.Spec.BGPPeers); err != nil { + return err + } + + // Reconcile BGP advertisements. + if err := c.reconcileBGPAdvertisements(ctx, effectivePolicy.Spec.Advertisements); err != nil { + return err + } + + return nil +} + +func (c *Controller) reconcileBGPPeers(ctx context.Context, bgpPeers []v1alpha1.BGPPeer) error { + curPeerConfigs := c.getPeerConfigs(bgpPeers) + prePeerConfigs := c.bgpPolicyState.peerConfigs + prePeerKeys := sets.KeySet(prePeerConfigs) + curPeerKeys := sets.KeySet(curPeerConfigs) + + peerToAddKeys := curPeerKeys.Difference(prePeerKeys) + peerToUpdateKeys := sets.New[string]() + for peerKey := range prePeerKeys.Intersection(curPeerKeys) { + prevPeerConfig := prePeerConfigs[peerKey] + curPeerConfig := curPeerConfigs[peerKey] + if !reflect.DeepEqual(prevPeerConfig, curPeerConfig) { + peerToUpdateKeys.Insert(peerKey) + } + } + peerToDeleteKeys := prePeerKeys.Difference(curPeerKeys) + + bgpServer := c.bgpPolicyState.bgpServer + for key := range peerToAddKeys { + peerConfig := curPeerConfigs[key] + if err := bgpServer.AddPeer(ctx, peerConfig); err != nil { + return err + } + c.bgpPolicyState.peerConfigs[key] = peerConfig + } + for key := range peerToUpdateKeys { + peerConfig := curPeerConfigs[key] + if err := bgpServer.UpdatePeer(ctx, peerConfig); err != nil { + return err + } + c.bgpPolicyState.peerConfigs[key] = peerConfig + } + for key := range peerToDeleteKeys { + peerConfig := prePeerConfigs[key] + if err := bgpServer.RemovePeer(ctx, peerConfig); err != nil { + return err + } + delete(c.bgpPolicyState.peerConfigs, key) + } + + return nil +} + +func (c *Controller) reconcileBGPAdvertisements(ctx context.Context, bgpAdvertisements v1alpha1.Advertisements) error { + curRoutes, err := c.getRoutes(bgpAdvertisements) + if err != nil { + return err + } + preRoutes := c.bgpPolicyState.routes + routesToAdvertise := curRoutes.Difference(preRoutes) + routesToWithdraw := preRoutes.Difference(curRoutes) + + bgpServer := c.bgpPolicyState.bgpServer + for route := range routesToAdvertise { + if err := bgpServer.AdvertiseRoutes(ctx, []bgp.Route{route}); err != nil { + return err + } + c.bgpPolicyState.routes.Insert(route) + } + for route := range routesToWithdraw { + if err := bgpServer.WithdrawRoutes(ctx, []bgp.Route{route}); err != nil { + return err + } + c.bgpPolicyState.routes.Delete(route) + } + + return nil +} + +func hashNodeNameToIP(s string) string { + h := fnv.New32a() // Create a new FNV hash + h.Write([]byte(s)) + hashValue := h.Sum32() // Get the 32-bit hash + + // Convert the hash to a 4-byte slice + ip := make(net.IP, 4) + ip[0] = byte(hashValue >> 24) + ip[1] = byte(hashValue >> 16) + ip[2] = byte(hashValue >> 8) + ip[3] = byte(hashValue) + + return ip.String() +} + +func (c *Controller) getRouterID() (string, error) { + // According to RFC 4271: + // BGP Identifier: + // This 4-octet unsigned integer indicates the BGP Identifier of + // the sender. A given BGP speaker sets the value of its BGP + // Identifier to an IP address that is assigned to that BGP + // speaker. The value of the BGP Identifier is determined upon + // startup and is the same for every local interface and BGP peer. + // + // In goBGP, only an IPv4 address can be used as the BGP Identifier (BGP router ID). + // For IPv4-only or dual-stack Kubernetes clusters, the Node's IPv4 address is used as the BGP router ID, ensuring + // uniqueness. + // For IPv6-only Kubernetes clusters without a Node IPv4 address, the router ID could be specified in the Node + // annotation `node.antrea.io/bgp-router-id`. If the annotation is not present, an IPv4 address will be generated by + // hashing the Node name and updated to the Node annotation `node.antrea.io/bgp-router-id`. + + if c.enabledIPv4 { + return c.nodeIPv4Addr, nil + } + + nodeObj, err := c.nodeLister.Get(c.nodeName) + if err != nil { + return "", fmt.Errorf("failed to get Node object: %w", err) + } + + var exists bool + var routerID string + routerID, exists = nodeObj.GetAnnotations()[types.NodeBGPRouterIDAnnotationKey] + if !exists { + routerID = hashNodeNameToIP(c.nodeName) + patch, _ := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]string{ + types.NodeBGPRouterIDAnnotationKey: routerID, + }, + }, + }) + if _, err := c.k8sClient.CoreV1().Nodes().Patch(context.TODO(), c.nodeName, apitypes.MergePatchType, patch, metav1.PatchOptions{}, "status"); err != nil { + return "", fmt.Errorf("failed to patch BGP router ID to Node annotation %s: %w", types.NodeBGPRouterIDAnnotationKey, err) + } + } else if !utilnet.IsIPv4String(routerID) { + return "", fmt.Errorf("BGP router ID should be an IPv4 address string") + } + return routerID, nil +} + +func (c *Controller) getRoutes(advertisements v1alpha1.Advertisements) (sets.Set[bgp.Route], error) { + allRoutes := sets.New[bgp.Route]() + + if advertisements.Service != nil { + c.addServiceRoutes(advertisements.Service, allRoutes) + } + if c.egressEnabled && advertisements.Egress != nil { + c.addEgressRoutes(allRoutes) + } + if advertisements.Pod != nil { + c.addPodRoutes(allRoutes) + } + + return allRoutes, nil +} + +func (c *Controller) addServiceRoutes(advertisement *v1alpha1.ServiceAdvertisement, allRoutes sets.Set[bgp.Route]) { + ipTypes := sets.New(advertisement.IPTypes...) + services, _ := c.serviceLister.List(labels.Everything()) + + var serviceIPs []string + for _, svc := range services { + internalLocal := svc.Spec.InternalTrafficPolicy != nil && *svc.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyLocal + externalLocal := svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyLocal + var hasLocalEndpoints bool + if internalLocal || externalLocal { + hasLocalEndpoints = c.hasLocalEndpoints(svc) + } + if ipTypes.Has(v1alpha1.ServiceIPTypeClusterIP) { + if internalLocal && hasLocalEndpoints || !internalLocal { + for _, clusterIP := range svc.Spec.ClusterIPs { + serviceIPs = append(serviceIPs, clusterIP) + } + } + } + if ipTypes.Has(v1alpha1.ServiceIPTypeExternalIP) { + if externalLocal && hasLocalEndpoints || !externalLocal { + for _, externalIP := range svc.Spec.ExternalIPs { + serviceIPs = append(serviceIPs, externalIP) + } + } + } + if ipTypes.Has(v1alpha1.ServiceIPTypeLoadBalancerIP) && svc.Spec.Type == corev1.ServiceTypeLoadBalancer { + if externalLocal && hasLocalEndpoints || !externalLocal { + serviceIPs = append(serviceIPs, getIngressIPs(svc)...) + } + } + } + + for _, ip := range serviceIPs { + if c.enabledIPv4 && utilnet.IsIPv4String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv4Suffix}) + } + if c.enabledIPv6 && utilnet.IsIPv6String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv6Suffix}) + } + } +} + +func (c *Controller) addEgressRoutes(allRoutes sets.Set[bgp.Route]) { + egresses, _ := c.egressLister.List(labels.Everything()) + for _, eg := range egresses { + if eg.Status.EgressNode != c.nodeName { + continue + } + ip := eg.Status.EgressIP + if c.enabledIPv4 && utilnet.IsIPv4String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv4Suffix}) + } + if c.enabledIPv6 && utilnet.IsIPv6String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv6Suffix}) + } + } +} + +func (c *Controller) addPodRoutes(allRoutes sets.Set[bgp.Route]) { + if c.enabledIPv4 { + allRoutes.Insert(bgp.Route{Prefix: c.podIPv4CIDR}) + } + if c.enabledIPv6 { + allRoutes.Insert(bgp.Route{Prefix: c.podIPv6CIDR}) + } +} + +func (c *Controller) hasLocalEndpoints(svc *corev1.Service) bool { + labelSelector := labels.Set{discovery.LabelServiceName: svc.GetName()}.AsSelector() + items, _ := c.endpointSliceLister.EndpointSlices(svc.GetNamespace()).List(labelSelector) + for _, eps := range items { + for _, ep := range eps.Endpoints { + if ep.NodeName != nil && *ep.NodeName == c.nodeName { + return true + } + } + } + return false +} + +func (c *Controller) getPeerConfigs(peers []v1alpha1.BGPPeer) map[string]bgp.PeerConfig { + c.bgpPeerPasswordsMutex.RLock() + defer c.bgpPeerPasswordsMutex.RUnlock() + + peerConfigs := make(map[string]bgp.PeerConfig) + for i := range peers { + if c.enabledIPv4 && utilnet.IsIPv4String(peers[i].Address) || + c.enabledIPv6 && utilnet.IsIPv6String(peers[i].Address) { + peerKey := generateBGPPeerKey(peers[i].Address, peers[i].ASN) + + var password string + if p, exists := c.bgpPeerPasswords[peerKey]; exists { + password = p + } + + peerConfigs[peerKey] = bgp.PeerConfig{ + BGPPeer: &peers[i], + Password: password, + } + } + } + return peerConfigs +} + +func generateBGPPeerKey(address string, asn int32) string { + return fmt.Sprintf("%s-%d", address, asn) +} + +func (c *Controller) addBGPPolicy(obj interface{}) { + bgpPolicy := obj.(*v1alpha1.BGPPolicy) + if !c.matchesCurrentNode(bgpPolicy) { + return + } + klog.V(2).InfoS("Processing BGPPolicy ADD event", "BGPPolicy", klog.KObj(bgpPolicy)) + c.queue.Add(dummyKey) +} + +func (c *Controller) updateBGPPolicy(oldObj, obj interface{}) { + oldBGPPolicy := oldObj.(*v1alpha1.BGPPolicy) + policy := obj.(*v1alpha1.BGPPolicy) + if !c.matchesCurrentNode(policy) && !c.matchesCurrentNode(oldBGPPolicy) { + return + } + if policy.GetGeneration() != oldBGPPolicy.GetGeneration() { + klog.V(2).InfoS("Processing BGPPolicy UPDATE event", "BGPPolicy", klog.KObj(policy)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) deleteBGPPolicy(obj interface{}) { + bgpPolicy := obj.(*v1alpha1.BGPPolicy) + if !c.matchesCurrentNode(bgpPolicy) { + return + } + klog.V(2).InfoS("Processing BGPPolicy DELETE event", "BGPPolicy", klog.KObj(bgpPolicy)) + c.queue.Add(dummyKey) +} + +func getIngressIPs(svc *corev1.Service) []string { + var ips []string + for _, ingress := range svc.Status.LoadBalancer.Ingress { + if ingress.IP != "" { + ips = append(ips, ingress.IP) + } + } + return ips +} + +func (c *Controller) matchesCurrentNode(bgpPolicy *v1alpha1.BGPPolicy) bool { + node, _ := c.nodeLister.Get(c.nodeName) + if node == nil { + return false + } + return matchesNode(node, bgpPolicy) +} + +func matchesNode(node *corev1.Node, bgpPolicy *v1alpha1.BGPPolicy) bool { + nodeSelector, _ := metav1.LabelSelectorAsSelector(&bgpPolicy.Spec.NodeSelector) + return nodeSelector.Matches(labels.Set(node.Labels)) +} + +func matchesService(svc *corev1.Service, bgpPolicy *v1alpha1.BGPPolicy) bool { + ipTypeMap := sets.New(bgpPolicy.Spec.Advertisements.Service.IPTypes...) + if ipTypeMap.Has(v1alpha1.ServiceIPTypeClusterIP) && len(svc.Spec.ClusterIPs) != 0 || + ipTypeMap.Has(v1alpha1.ServiceIPTypeExternalIP) && len(svc.Spec.ExternalIPs) != 0 || + ipTypeMap.Has(v1alpha1.ServiceIPTypeLoadBalancerIP) && len(getIngressIPs(svc)) != 0 { + return true + } + return false +} + +func (c *Controller) hasAffectedPolicyByService(svc *corev1.Service) bool { + allPolicies, _ := c.bgpPolicyLister.List(labels.Everything()) + for _, policy := range allPolicies { + if policy.Spec.Advertisements.Service == nil || !c.matchesCurrentNode(policy) { + continue + } + if matchesService(svc, policy) { + return true + } + } + return false +} + +func (c *Controller) addService(obj interface{}) { + svc := obj.(*corev1.Service) + if c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing Service ADD event", "Service", klog.KObj(svc)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) updateService(oldObj, obj interface{}) { + oldSvc := oldObj.(*corev1.Service) + svc := obj.(*corev1.Service) + + if slices.Equal(oldSvc.Spec.ClusterIPs, svc.Spec.ClusterIPs) && + slices.Equal(oldSvc.Spec.ExternalIPs, svc.Spec.ExternalIPs) && + slices.Equal(getIngressIPs(oldSvc), getIngressIPs(svc)) && + oldSvc.Spec.ExternalTrafficPolicy == svc.Spec.ExternalTrafficPolicy && + ptr.Equal(oldSvc.Spec.InternalTrafficPolicy, svc.Spec.InternalTrafficPolicy) { + return + } + if c.hasAffectedPolicyByService(oldSvc) || c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing Service UPDATE event", "Service", klog.KObj(svc)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) deleteService(obj interface{}) { + svc := obj.(*corev1.Service) + if c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing Service DELETE event", "Service", klog.KObj(svc)) + c.queue.Add(dummyKey) + } +} + +func noLocalTrafficPolicy(svc *corev1.Service) bool { + internalTrafficCluster := svc.Spec.InternalTrafficPolicy == nil || *svc.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyCluster + if svc.Spec.Type == corev1.ServiceTypeClusterIP { + return internalTrafficCluster + } + externalTrafficCluster := svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeCluster + return internalTrafficCluster && externalTrafficCluster +} + +func (c *Controller) addEndpointSlice(obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + svc, _ := c.serviceLister.Services(eps.GetNamespace()).Get(eps.GetLabels()[discovery.LabelServiceName]) + if svc == nil { + return + } + // Events of EndpointSlices for Services without a `Local` traffic policy are ignored, as the Service IPs will + // always be advertised. + if noLocalTrafficPolicy(svc) { + return + } + if c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing EndpointSlice ADD event", "EndpointSlice", klog.KObj(eps)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) updateEndpointSlice(_, obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + svc, _ := c.serviceLister.Services(eps.GetNamespace()).Get(eps.GetLabels()[discovery.LabelServiceName]) + if svc == nil { + return + } + // Events of EndpointSlices for Services without a `Local` traffic policy are ignored, as the Service IPs will + // always be advertised. + if noLocalTrafficPolicy(svc) { + return + } + if c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing EndpointSlice UPDATE event", "EndpointSlice", klog.KObj(eps)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) deleteEndpointSlice(obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + svc, _ := c.serviceLister.Services(eps.GetNamespace()).Get(eps.GetLabels()[discovery.LabelServiceName]) + if svc == nil { + return + } + // Events of EndpointSlices for Services without a `Local` traffic policy are ignored, as the Service IPs will + // always be advertised. + if noLocalTrafficPolicy(svc) { + return + } + if c.hasAffectedPolicyByService(svc) { + klog.V(2).InfoS("Processing EndpointSlice DELETE event", "EndpointSlice", klog.KObj(eps)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) hasAffectedPolicyByEgress() bool { + allPolicies, _ := c.bgpPolicyLister.List(labels.Everything()) + for _, policy := range allPolicies { + if !c.matchesCurrentNode(policy) { + continue + } + if policy.Spec.Advertisements.Egress != nil { + return true + } + } + return false +} + +func (c *Controller) addEgress(obj interface{}) { + eg := obj.(*v1beta1.Egress) + if eg.Status.EgressNode != c.nodeName { + return + } + if c.hasAffectedPolicyByEgress() { + klog.V(2).InfoS("Processing Egress ADD event", "Egress", klog.KObj(eg)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) updateEgress(oldObj, obj interface{}) { + oldEg := oldObj.(*v1beta1.Egress) + eg := obj.(*v1beta1.Egress) + if oldEg.Status.EgressNode != c.nodeName && eg.Status.EgressNode != c.nodeName { + return + } + if oldEg.Status.EgressIP == eg.Status.EgressIP && oldEg.Status.EgressNode == eg.Status.EgressNode { + return + } + if c.hasAffectedPolicyByEgress() { + klog.V(2).InfoS("Processing Egress UPDATE event", "Egress", klog.KObj(eg)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) deleteEgress(obj interface{}) { + eg := obj.(*v1beta1.Egress) + if eg.Status.EgressNode != c.nodeName { + return + } + if c.hasAffectedPolicyByEgress() { + klog.V(2).InfoS("Processing Egress DELETE event", "Egress", klog.KObj(eg)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) hasAffectedPolicyByNode(node *corev1.Node) bool { + allPolicies, _ := c.bgpPolicyLister.List(labels.Everything()) + for _, policy := range allPolicies { + if matchesNode(node, policy) { + return true + } + } + return false +} + +func (c *Controller) addNode(obj interface{}) { + node := obj.(*corev1.Node) + if node.GetName() != c.nodeName { + return + } + if c.hasAffectedPolicyByNode(node) { + klog.V(2).InfoS("Processing Node UPDATE event", "Node", klog.KObj(node)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) updateNode(oldObj, obj interface{}) { + oldNode := oldObj.(*corev1.Node) + node := obj.(*corev1.Node) + if node.GetName() != c.nodeName { + return + } + if reflect.DeepEqual(node.GetLabels(), oldNode.GetLabels()) && + reflect.DeepEqual(node.GetAnnotations(), oldNode.GetAnnotations()) { + return + } + if c.hasAffectedPolicyByNode(oldNode) || c.hasAffectedPolicyByNode(node) { + klog.V(2).InfoS("Processing Node UPDATE event", "Node", klog.KObj(node)) + c.queue.Add(dummyKey) + } +} + +func (c *Controller) addSecret(obj interface{}) { + secret := obj.(*corev1.Secret) + c.updateBGPPeerPasswords(secret) + c.queue.Add(dummyKey) +} + +func (c *Controller) updateSecret(_, obj interface{}) { + secret := obj.(*corev1.Secret) + c.updateBGPPeerPasswords(secret) + c.queue.Add(dummyKey) +} + +func (c *Controller) deleteSecret(_ interface{}) { + c.updateBGPPeerPasswords(nil) + c.queue.Add(dummyKey) +} + +func (c *Controller) updateBGPPeerPasswords(secret *corev1.Secret) { + c.bgpPeerPasswordsMutex.Lock() + defer c.bgpPeerPasswordsMutex.Unlock() + + c.bgpPeerPasswords = make(map[string]string) + if secret.Data != nil { + for k, v := range secret.Data { + c.bgpPeerPasswords[k] = string(v) + } + } +} diff --git a/pkg/agent/controller/bgp/controller_test.go b/pkg/agent/controller/bgp/controller_test.go new file mode 100644 index 00000000000..6d1f084dc35 --- /dev/null +++ b/pkg/agent/controller/bgp/controller_test.go @@ -0,0 +1,2043 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bgp + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + utilnet "k8s.io/utils/net" + "k8s.io/utils/ptr" + + "antrea.io/antrea/pkg/agent/bgp" + bgptest "antrea.io/antrea/pkg/agent/bgp/testing" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + crdv1b1 "antrea.io/antrea/pkg/apis/crd/v1beta1" + fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/util/ip" +) + +const ( + namespaceDefault = "default" + namespaceKubeSystem = "kube-system" +) + +var ( + podIPv4CIDR = ip.MustParseCIDR("10.10.0.0/24") + podIPv6CIDR = ip.MustParseCIDR("fec0:10:10::/64") + nodeIPv4Addr = ip.MustParseCIDR("192.168.77.100/24") + + testNodeConfig = &config.NodeConfig{ + PodIPv4CIDR: podIPv4CIDR, + PodIPv6CIDR: podIPv6CIDR, + NodeIPv4Addr: nodeIPv4Addr, + Name: localNodeName, + } + + peer1ASN = int32(65531) + peer1AuthPassword = "bgp-peer1" // #nosec G101 + ipv4Peer1Addr = "192.168.77.251" + ipv6Peer1Addr = "fec0::196:168:77:251" + ipv4Peer1 = generateBGPPeer(ipv4Peer1Addr, peer1ASN, 179, 120) + ipv6Peer1 = generateBGPPeer(ipv6Peer1Addr, peer1ASN, 179, 120) + ipv4Peer1Config = generateBGPPeerConfig(&ipv4Peer1, peer1AuthPassword) + ipv6Peer1Config = generateBGPPeerConfig(&ipv6Peer1, peer1AuthPassword) + + peer2ASN = int32(65532) + peer2AuthPassword = "bgp-peer2" // #nosec G101 + ipv4Peer2Addr = "192.168.77.252" + ipv6Peer2Addr = "fec0::196:168:77:252" + ipv4Peer2 = generateBGPPeer(ipv4Peer2Addr, peer2ASN, 179, 120) + ipv6Peer2 = generateBGPPeer(ipv6Peer2Addr, peer2ASN, 179, 120) + ipv4Peer2Config = generateBGPPeerConfig(&ipv4Peer2, peer2AuthPassword) + ipv6Peer2Config = generateBGPPeerConfig(&ipv6Peer2, peer2AuthPassword) + + updatedIPv4Peer2 = generateBGPPeer(ipv4Peer2Addr, peer2ASN, 179, 60) + updatedIPv6Peer2 = generateBGPPeer(ipv6Peer2Addr, peer2ASN, 179, 60) + updatedIPv4Peer2Config = generateBGPPeerConfig(&updatedIPv4Peer2, peer2AuthPassword) + updatedIPv6Peer2Config = generateBGPPeerConfig(&updatedIPv6Peer2, peer2AuthPassword) + + peer3ASN = int32(65533) + peer3AuthPassword = "bgp-peer3" // #nosec G101 + ipv4Peer3Addr = "192.168.77.253" + ipv6Peer3Addr = "fec0::196:168:77:253" + ipv4Peer3 = generateBGPPeer(ipv4Peer3Addr, peer3ASN, 179, 120) + ipv6Peer3 = generateBGPPeer(ipv6Peer3Addr, peer3ASN, 179, 120) + ipv4Peer3Config = generateBGPPeerConfig(&ipv4Peer3, peer3AuthPassword) + ipv6Peer3Config = generateBGPPeerConfig(&ipv6Peer3, peer3AuthPassword) + + nodeLabels1 = map[string]string{"node": "control-plane"} + nodeLabels2 = map[string]string{"os": "linux"} + nodeLabels3 = map[string]string{"node": "control-plane", "os": "linux"} + nodeAnnotations1 = map[string]string{types.NodeBGPRouterIDAnnotationKey: "192.168.77.100"} + nodeAnnotations2 = map[string]string{types.NodeBGPRouterIDAnnotationKey: "10.10.0.100"} + + localNodeName = "local" + node = generateNode(localNodeName, nodeLabels1, nodeAnnotations1) + + ipv4EgressIP1 = "192.168.77.200" + ipv6EgressIP1 = "fec0::192:168:77:200" + ipv4EgressIP2 = "192.168.77.201" + ipv6EgressIP2 = "fec0::192:168:77:2001" + + ipv4Egress1 = generateEgress("eg1-4", ipv4EgressIP1, localNodeName) + ipv6Egress1 = generateEgress("eg1-6", ipv6EgressIP1, localNodeName) + ipv4Egress2 = generateEgress("eg2-4", ipv4EgressIP2, "test-remote-node") + ipv6Egress2 = generateEgress("eg2-6", ipv6EgressIP2, "test-remote-node") + + bgpPolicyName1 = "bp-1" + bgpPolicyName2 = "bp-2" + bgpPolicyName3 = "bp-3" + bgpPolicyName4 = "bp-4" + + creationTimestamp = metav1.Now() + creationTimestampAdd1s = metav1.NewTime(creationTimestamp.Add(time.Second)) + creationTimestampAdd2s = metav1.NewTime(creationTimestamp.Add(2 * time.Second)) + creationTimestampAdd3s = metav1.NewTime(creationTimestamp.Add(3 * time.Second)) + + clusterIPv4 = "10.96.10.10" + externalIPv4 = "192.168.77.100" + loadBalancerIPv4 = "192.168.77.150" + endpointIPv4 = "10.10.0.10" + clusterIPv6 = "fec0::10:96:10:10" + externalIPv6 = "fec0::192:168:77:100" + loadBalancerIPv6 = "fec0::192:168:77:150" + endpointIPv6 = "fec0::10:10:0:10" + + ipv4ClusterIPName1 = "clusterip-4" + ipv4ClusterIPName2 = "clusterip-4-local" + ipv6ClusterIPName1 = "clusterip-6" + ipv6ClusterIPName2 = "clusterip-6-local" + ipv4LoadBalancerName = "loadbalancer-4" + ipv6LoadBalancerName = "loadbalancer-6" + + endpointSliceSuffix = rand.String(5) + ipv4ClusterIP1 = generateService(ipv4ClusterIPName1, corev1.ServiceTypeClusterIP, clusterIPv4, externalIPv4, "", false, false) + ipv4ClusterIP1Eps = generateEndpointSlice(ipv4ClusterIPName1, endpointSliceSuffix, false, false, endpointIPv4) + ipv4ClusterIP2 = generateService(ipv4ClusterIPName2, corev1.ServiceTypeClusterIP, clusterIPv4, externalIPv4, "", true, true) + ipv4ClusterIP2Eps = generateEndpointSlice(ipv4ClusterIPName2, endpointSliceSuffix, false, false, endpointIPv4) + + ipv6ClusterIP1 = generateService(ipv6ClusterIPName1, corev1.ServiceTypeClusterIP, clusterIPv6, externalIPv6, "", false, false) + ipv6ClusterIP1Eps = generateEndpointSlice(ipv6ClusterIPName1, endpointSliceSuffix, false, false, endpointIPv6) + ipv6ClusterIP2 = generateService(ipv6ClusterIPName2, corev1.ServiceTypeClusterIP, clusterIPv6, externalIPv6, "", true, true) + ipv6ClusterIP2Eps = generateEndpointSlice(ipv6ClusterIPName2, endpointSliceSuffix, false, false, endpointIPv6) + + ipv4LoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, clusterIPv4, externalIPv4, loadBalancerIPv4, false, false) + ipv4LoadBalancerEps = generateEndpointSlice(ipv4LoadBalancerName, endpointSliceSuffix, false, false, endpointIPv4) + ipv6LoadBalancer = generateService(ipv6LoadBalancerName, corev1.ServiceTypeLoadBalancer, clusterIPv6, externalIPv6, loadBalancerIPv6, false, false) + ipv6LoadBalancerEps = generateEndpointSlice(ipv6LoadBalancerName, endpointSliceSuffix, false, false, endpointIPv6) + + bgpPeerPasswords = map[string]string{ + generateBGPPeerKey(ipv4Peer1Addr, peer1ASN): peer1AuthPassword, + generateBGPPeerKey(ipv6Peer1Addr, peer1ASN): peer1AuthPassword, + generateBGPPeerKey(ipv4Peer2Addr, peer2ASN): peer2AuthPassword, + generateBGPPeerKey(ipv6Peer2Addr, peer2ASN): peer2AuthPassword, + generateBGPPeerKey(ipv4Peer3Addr, peer3ASN): peer3AuthPassword, + generateBGPPeerKey(ipv6Peer3Addr, peer3ASN): peer3AuthPassword, + } +) + +type fakeController struct { + *Controller + mockController *gomock.Controller + mockBGPServer *bgptest.MockInterface + crdClient *fakeversioned.Clientset + crdInformerFactory crdinformers.SharedInformerFactory + client *fake.Clientset + informerFactory informers.SharedInformerFactory +} + +func (c *fakeController) startInformers(stopCh chan struct{}) { + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) +} + +func newFakeController(t *testing.T, objects []runtime.Object, crdObjects []runtime.Object, ipv4Enabled, ipv6Enabled bool) *fakeController { + ctrl := gomock.NewController(t) + mockBGPServer := bgptest.NewMockInterface(ctrl) + + client := fake.NewSimpleClientset(objects...) + crdClient := fakeversioned.NewSimpleClientset(crdObjects...) + + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0) + informerFactory := informers.NewSharedInformerFactory(client, 0) + + nodeInformer := informerFactory.Core().V1().Nodes() + serviceInformer := informerFactory.Core().V1().Services() + egressInformer := crdInformerFactory.Crd().V1beta1().Egresses() + endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices() + bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies() + + bgpController, _ := NewBGPPolicyController(nodeInformer, + serviceInformer, + egressInformer, + bgpPolicyInformer, + endpointSliceInformer, + true, + client, + testNodeConfig, + &config.NetworkConfig{ + IPv4Enabled: ipv4Enabled, + IPv6Enabled: ipv6Enabled, + }) + bgpController.egressEnabled = true + bgpController.newBGPServerFn = func(_ *bgp.GlobalConfig) bgp.Interface { + return mockBGPServer + } + + return &fakeController{ + Controller: bgpController, + mockController: ctrl, + mockBGPServer: mockBGPServer, + crdClient: crdClient, + crdInformerFactory: crdInformerFactory, + client: client, + informerFactory: informerFactory, + } +} + +func TestBGPPolicyAdd(t *testing.T) { + testCases := []struct { + name string + ipv4Enabled bool + ipv6Enabled bool + bpsToAdd []runtime.Object + objects []runtime.Object + crdObjects []runtime.Object + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + expectedError string + }{ + { + name: "IPv4, as effective BGPPolicy, advertise ClusterIP", + ipv4Enabled: true, + bpsToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + false, + true, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1}), + }, + objects: []runtime.Object{ + ipv4ClusterIP1, + ipv4ClusterIP1Eps, + node, + }, + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv4)}}) + }, + }, + { + name: "IPv6, as effective BGPPolicy, advertise ExternalIP", + ipv6Enabled: true, + bpsToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + true, + true, + true, + false, + []v1alpha1.BGPPeer{ipv6Peer1})}, + objects: []runtime.Object{ + ipv6ClusterIP1, + ipv6ClusterIP1Eps, + node, + }, + expectedState: generateBGPPolicyState(179, + 65000, + "192.168.77.100", + []string{ipStrToPrefix(externalIPv6)}, + []bgp.PeerConfig{ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + }, + }, + { + name: "IPv4 & IPv6, as effective BGPPolicy, advertise LoadBalancerIP", + ipv4Enabled: true, + ipv6Enabled: true, + bpsToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1})}, + objects: []runtime.Object{ + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + }, + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(loadBalancerIPv4), ipStrToPrefix(loadBalancerIPv6)}, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv6)}}) + }, + }, + { + name: "IPv4, as effective BGPPolicy, advertise EgressIP", + ipv4Enabled: true, + bpsToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + true, + true, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1})}, + objects: []runtime.Object{node}, + crdObjects: []runtime.Object{ + ipv4Egress1, + ipv4Egress2, + }, + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(ipv4EgressIP1)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(ipv4EgressIP1)}}) + }, + }, + { + name: "IPv6, as effective BGPPolicy, advertise Pod CIDR", + ipv6Enabled: true, + bpsToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + true, + true, + true, + true, + []v1alpha1.BGPPeer{ipv6Peer1})}, + objects: []runtime.Object{node}, + expectedState: generateBGPPolicyState(179, + 65000, + "192.168.77.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "IPv4 & IPv6, as effective BGPPolicy, not advertise any Service IP due to no local Endpoint", + ipv4Enabled: true, + ipv6Enabled: true, + bpsToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 1179, + 65001, + true, + true, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1})}, + objects: []runtime.Object{ + ipv4ClusterIP2, + ipv4ClusterIP2Eps, + ipv6ClusterIP2, + ipv6ClusterIP2Eps, + node, + }, + expectedState: generateBGPPolicyState(1179, + 65001, + nodeIPv4Addr.IP.String(), + nil, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + }, + }, + { + name: "IPv4, as alternative BGPPolicy", + ipv4Enabled: true, + bpsToAdd: []runtime.Object{generateBGPPolicy(bgpPolicyName2, + creationTimestamp, // As the effective BGPPolicy because the creationTimestamp is the oldest. + nodeLabels1, + 179, + 65000, + true, + false, + false, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1}), + generateBGPPolicy(bgpPolicyName1, + creationTimestampAdd1s, + nodeLabels1, + 179, + 65000, + true, + false, + false, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1})}, + objects: []runtime.Object{ipv4ClusterIP1, ipv4ClusterIP1Eps, node}, + existingState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, tt.objects, append(tt.crdObjects, tt.bpsToAdd...), tt.ipv4Enabled, tt.ipv6Enabled) + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Fake the BGPPolicy state and the passwords of BGP peers. + c.bgpPolicyState = tt.existingState + if c.bgpPolicyState != nil { + c.bgpPolicyState.bgpServer = c.mockBGPServer + } + c.bgpPeerPasswords = bgpPeerPasswords + + // Wait for the dummy event triggered by BGPPolicy add events. + waitAndGetDummyEvent(t, c) + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + if tt.expectedError != "" { + assert.EqualError(t, c.syncBGPPolicy(ctx), tt.expectedError) + } else { + assert.NoError(t, c.syncBGPPolicy(ctx)) + } + // Done with the dummy event. + doneDummyEvent(t, c) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + }) + } +} + +func TestBGPPolicyUpdate(t *testing.T) { + effectiveBP := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }) + effectiveBPState := generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ) + alternativeBP := generateBGPPolicy(bgpPolicyName2, + creationTimestampAdd1s, + nodeLabels1, + 1179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }) + unrelatedBP := generateBGPPolicy(bgpPolicyName3, + creationTimestampAdd2s, + nodeLabels2, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }) + objects := []runtime.Object{ + ipv4ClusterIP2, + ipv4ClusterIP2Eps, + ipv6ClusterIP2, + ipv6ClusterIP2Eps, + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + } + crdObjects := []runtime.Object{ipv4Egress1, + ipv4Egress2, + ipv6Egress1, + ipv6Egress2, + effectiveBP, + alternativeBP, + unrelatedBP, + } + testCases := []struct { + name string + bpToUpdate *v1alpha1.BGPPolicy + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + expectedError string + }{ + { + name: "Effective BGPPolicy, update NodeSelector (not applied to current Node), an alternative takes effect", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels2, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer2Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + expectedState: generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + }, + { + name: "Effective BGPPolicy, update NodeSelector (not applied to current Node), failed to stop current BGP server", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels2, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(gomock.Any()).Return(fmt.Errorf("failed to stop")) + }, + expectedState: deepCopyBGPPolicyState(effectiveBPState), + expectedError: "failed to stop current BGP server: failed to stop", + }, + { + name: "Effective BGPPolicy, update Advertisements", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + true, + false, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + ipStrToPrefix(ipv4EgressIP1), + ipStrToPrefix(ipv6EgressIP1), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(ipv4EgressIP1)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(ipv6EgressIP1)}}) + + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv4)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv6)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv6)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "Effective BGPPolicy, update LocalASN and Advertisements", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65001, + false, + true, + false, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedState: generateBGPPolicyState(179, + 65001, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + ipStrToPrefix(ipv4EgressIP1), + ipStrToPrefix(ipv6EgressIP1), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer2Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(ipv4EgressIP1)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(ipv6EgressIP1)}}) + }, + }, + { + name: "Effective BGPPolicy, update ListenPort", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 1179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedState: generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer2Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv6)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "Effective BGPPolicy, update BGPPeers", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{updatedIPv4Peer2, + updatedIPv6Peer2, + ipv4Peer3, + ipv6Peer3}), + expectedState: generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{updatedIPv4Peer2Config, + updatedIPv6Peer2Config, + ipv4Peer3Config, + ipv6Peer3Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer3Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer3Config) + mockBGPServer.RemovePeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.RemovePeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.UpdatePeer(gomock.Any(), updatedIPv4Peer2Config) + mockBGPServer.UpdatePeer(gomock.Any(), updatedIPv6Peer2Config) + }, + }, + { + name: "Unrelated BGPPolicy, update NodeSelector (applied to current Node)", + bpToUpdate: generateBGPPolicy(bgpPolicyName3, + creationTimestampAdd2s, + nodeLabels1, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + existingState: deepCopyBGPPolicyState(effectiveBPState), + expectedState: deepCopyBGPPolicyState(effectiveBPState), + }, + { + name: "Alternative BGPPolicy, update Advertisements, LocalASN, ListenPort and BGPPeers", + bpToUpdate: generateBGPPolicy(bgpPolicyName2, + creationTimestampAdd1s, + nodeLabels1, + 1179, + 65001, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1, + updatedIPv4Peer2, + ipv6Peer1, + updatedIPv6Peer2, + }), + existingState: deepCopyBGPPolicyState(effectiveBPState), + expectedState: deepCopyBGPPolicyState(effectiveBPState), + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, objects, crdObjects, true, true) + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Wait for the dummy event triggered by BGPPolicy add events, and mark it done directly + // since we fake the expected state. + waitAndGetDummyEvent(t, c) + doneDummyEvent(t, c) + + // Fake the BGPPolicy state the passwords of BGP peers. + c.bgpPolicyState = deepCopyBGPPolicyState(effectiveBPState) + c.bgpPolicyState.bgpServer = c.mockBGPServer + c.bgpPeerPasswords = bgpPeerPasswords + + tt.bpToUpdate.Generation += 1 + _, err := c.crdClient.CrdV1alpha1().BGPPolicies().Update(context.TODO(), tt.bpToUpdate, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Wait for the dummy event triggered by BGPPolicy update events. + waitAndGetDummyEvent(t, c) + + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + if tt.expectedError != "" { + assert.EqualError(t, c.syncBGPPolicy(ctx), tt.expectedError) + } else { + assert.NoError(t, c.syncBGPPolicy(ctx)) + } + // Done with the dummy event. + doneDummyEvent(t, c) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + }) + } +} + +func TestBGPPolicyDelete(t *testing.T) { + bp1 := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ + ipv4Peer1, + ipv6Peer1, + }) + bp1State := generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + }, + []bgp.PeerConfig{ + ipv4Peer1Config, + ipv6Peer1Config, + }, + ) + bp2 := generateBGPPolicy(bgpPolicyName2, + creationTimestampAdd1s, + nodeLabels1, + 179, + 65000, + false, + true, + false, + false, + false, + []v1alpha1.BGPPeer{ + ipv4Peer2, + ipv6Peer2, + }) + bp2State := generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ + ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + }, + []bgp.PeerConfig{ + ipv4Peer2Config, + ipv6Peer2Config}, + ) + bp3 := generateBGPPolicy(bgpPolicyName3, + creationTimestampAdd1s, + nodeLabels1, + 1179, + 65000, + false, + true, + false, + false, + false, + []v1alpha1.BGPPeer{ + ipv4Peer2, + ipv6Peer2, + }) + bp3State := generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ + ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + }, + []bgp.PeerConfig{ + ipv4Peer2Config, + ipv6Peer2Config}, + ) + objects := []runtime.Object{ + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + } + testCases := []struct { + name string + bpToDelete string + crdObjects []runtime.Object + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + }{ + { + name: "Delete effective BGPPolicy and there is no alternative one", + bpToDelete: bgpPolicyName1, + crdObjects: []runtime.Object{bp1}, + existingState: deepCopyBGPPolicyState(bp1State), + expectedState: nil, + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(gomock.Any()) + }, + }, + { + name: "Delete effective BGPPolicy and there is an alternative one, not need to start new BGP server", + bpToDelete: bgpPolicyName1, + crdObjects: []runtime.Object{bp1, bp2}, + existingState: deepCopyBGPPolicyState(bp1State), + expectedState: deepCopyBGPPolicyState(bp2State), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.RemovePeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.RemovePeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer2Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + mockBGPServer.WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv6)}}) + }, + }, + { + name: "Delete effective BGPPolicy and there is an alternative one, need to start new BGP server", + bpToDelete: bgpPolicyName1, + crdObjects: []runtime.Object{bp1, bp3}, + existingState: deepCopyBGPPolicyState(bp1State), + expectedState: deepCopyBGPPolicyState(bp3State), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.Start(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer2Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + }, + }, + { + name: "Delete an alternative BGPPolicy", + bpToDelete: bgpPolicyName2, + crdObjects: []runtime.Object{bp1, bp2}, + existingState: deepCopyBGPPolicyState(bp1State), + expectedState: deepCopyBGPPolicyState(bp1State), + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, objects, tt.crdObjects, true, true) + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Wait for the dummy event triggered by BGPPolicy add events, and mark it done. + waitAndGetDummyEvent(t, c) + doneDummyEvent(t, c) + + // Fake the BGPPolicy state and the passwords of BGP peers. + c.bgpPolicyState = tt.existingState + c.bgpPolicyState.bgpServer = c.mockBGPServer + c.bgpPeerPasswords = bgpPeerPasswords + + err := c.crdClient.CrdV1alpha1().BGPPolicies().Delete(context.TODO(), tt.bpToDelete, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Wait for the dummy event triggered by BGPPolicy delete events. + waitAndGetDummyEvent(t, c) + + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + assert.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + }) + } +} + +func TestNodeUpdate(t *testing.T) { + bp1 := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}) + bp1State := generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{podIPv4CIDR.String(), podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}) + bp2 := generateBGPPolicy(bgpPolicyName2, + creationTimestampAdd1s, + nodeLabels2, + 1179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}) + bp2State := generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{podIPv4CIDR.String(), podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}) + bp3 := generateBGPPolicy(bgpPolicyName3, + creationTimestampAdd2s, + nodeLabels3, + 179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}) + crdObjects := []runtime.Object{ + bp1, + bp2, + bp3, + } + testCases := []struct { + name string + ipv4Enabled bool + ipv6Enabled bool + node *corev1.Node + updatedNode *corev1.Node + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + }{ + { + name: "Update labels, a BGPPolicy is added to alternatives", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels3, nodeAnnotations1), + existingState: deepCopyBGPPolicyState(bp1State), + expectedState: deepCopyBGPPolicyState(bp1State), + }, + { + name: "Update labels, a BGPPolicy is removed from alternatives", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels3, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + existingState: deepCopyBGPPolicyState(bp1State), + expectedState: deepCopyBGPPolicyState(bp1State), + }, + { + name: "Update labels, effective BGPPolicy is updated to another one", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels2, nodeAnnotations1), + existingState: deepCopyBGPPolicyState(bp1State), + expectedState: deepCopyBGPPolicyState(bp2State), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "Update labels, effective BGPPolicy is updated to empty", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nil, nodeAnnotations1), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(gomock.Any()) + }, + existingState: deepCopyBGPPolicyState(bp1State), + }, + { + name: "IPv6 only, update annotations, effective BGPPolicy router ID is updated", + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels1, nodeAnnotations2), + existingState: generateBGPPolicyState(179, + 65000, + "192.168.77.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}), + expectedState: generateBGPPolicyState(179, + 65000, + "10.10.0.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "IPv6 only, remove annotations, router ID is generated from Node name ", + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels1, nil), + existingState: generateBGPPolicyState(179, + 65000, + "192.168.77.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}), + expectedState: generateBGPPolicyState(179, + 65000, + "156.67.103.8", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(gomock.Any()) + mockBGPServer.Stop(gomock.Any()) + mockBGPServer.AddPeer(gomock.Any(), ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, nil, crdObjects, tt.ipv4Enabled, tt.ipv6Enabled) + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + // Initializing BGPPolicy objects will not trigger a dummy event because the local Node object has not been + // initialized and synced yet. The dummy event will be trigger by adding the local Node object. + _, err := c.client.CoreV1().Nodes().Create(context.TODO(), tt.node, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for the dummy event triggered by Node add event. + waitAndGetDummyEvent(t, c) + doneDummyEvent(t, c) + + // Fake the BGPPolicy state. + c.bgpPolicyState = tt.existingState + c.bgpPolicyState.bgpServer = c.mockBGPServer + + _, err = c.client.CoreV1().Nodes().Update(context.TODO(), tt.updatedNode, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Wait for the dummy event triggered by Node update events. + waitAndGetDummyEvent(t, c) + + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + assert.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + if !tt.ipv4Enabled && tt.ipv6Enabled { + updatedNode, err := c.client.CoreV1().Nodes().Get(context.TODO(), localNodeName, metav1.GetOptions{}) + require.NoError(t, err) + require.NotNil(t, updatedNode.Annotations) + assert.Equal(t, tt.expectedState.routerID, updatedNode.Annotations[types.NodeBGPRouterIDAnnotationKey]) + } + }) + } +} + +func TestServiceLifecycle(t *testing.T) { + bp := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + true, + true, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1}) + c := newFakeController(t, []runtime.Object{node}, []runtime.Object{bp}, true, false) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + // Wait for the dummy event triggered by BGPPolicy add events. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().Start(gomock.Any()) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer1Config) + require.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + + // Create a Service configured with both `internalTrafficPolicy` and `externalTrafficPolicy` set to `Local`. + loadBalancer := generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.100", "192.168.77.150", true, true) + _, err := c.client.CoreV1().Services(namespaceDefault).Create(context.TODO(), loadBalancer, metav1.CreateOptions{}) + require.NoError(t, err) + + // Add an EndpointSlice without Endpoint IP for the Service. This could be happened at the moment after a Service is + // just created. + endpointSlice := generateEndpointSlice(ipv4LoadBalancerName, endpointSliceSuffix, true, false, "") + _, err = c.client.DiscoveryV1().EndpointSlices(namespaceDefault).Create(context.TODO(), endpointSlice, metav1.CreateOptions{}) + require.NoError(t, err) + + // Since both `internalTrafficPolicy` and `externalTrafficPolicy` are `Local` and no local Endpoint, no Service IP + // will be advertised. + waitAndGetDummyEvent(t, c) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update the EndpointSlice with a local Endpoint IP. + endpointSlice = generateEndpointSlice(ipv4LoadBalancerName, endpointSliceSuffix, true, false, "10.10.0.2") + _, err = c.client.DiscoveryV1().EndpointSlices(namespaceDefault).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Since there is a local Endpoint IP and both `internalTrafficPolicy` and `externalTrafficPolicy` are `Local`, the + // ClusterIP, externalIP, and LoadBalancerIP will be advertised. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "10.96.10.10/32"}}) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.100/32"}}) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.150/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update externalIP and LoadBalancerIP of the Service. Additionally, update both `externalTrafficPolicy` and + // `internalTrafficPolicy` to `Cluster`. + updatedLoadBalancer := generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", false, false) + _, err = c.client.CoreV1().Services(namespaceDefault).Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + // The stale externalIP and LoadBalancerIP will be withdrawn. The new externalIP and LoadBalancerIP will be advertised. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.151/32"}}) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.101/32"}}) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.100/32"}}) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.150/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update `externalTrafficPolicy` of the Service from `Cluster` to `Local`. + updatedLoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", false, true) + _, err = c.client.CoreV1().Services(namespaceDefault).Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Update the EndpointSlice with a remote Endpoint. + endpointSlice = generateEndpointSlice(ipv4LoadBalancerName, endpointSliceSuffix, false, false, "10.10.0.3") + _, err = c.client.DiscoveryV1().EndpointSlices(namespaceDefault).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Since there is no local Endpoint and `externalTrafficPolicy` is `Local`, the externalIP and LoadBalancerIP will be + // withdrawn. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.101/32"}}) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.151/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update `internalTrafficPolicy` of the Service from `Cluster` to `Local`. + updatedLoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", true, true) + _, err = c.client.CoreV1().Services(namespaceDefault).Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Since there is no local Endpoint and `internalTrafficPolicy` is `Local`, the ClusterIP will be withdrawn. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "10.96.10.10/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update `externalTrafficPolicy` of the Service from `Local` to `Cluster`. + updatedLoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", true, false) + _, err = c.client.CoreV1().Services(namespaceDefault).Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Since `externalTrafficPolicy` is `Cluster`, the ClusterIP will be advertised even if there is no local Endpoint. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.101/32"}}) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.151/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Delete the Service. + err = c.client.CoreV1().Services(namespaceDefault).Delete(context.TODO(), updatedLoadBalancer.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Since the Service is deleted, all corresponding Service IPs will be withdrawn. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.101/32"}}) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: "192.168.77.151/32"}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) +} + +func TestEgressLifecycle(t *testing.T) { + bp := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + false, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1}) + c := newFakeController(t, []runtime.Object{node}, []runtime.Object{bp}, true, false) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + // Wait for the dummy event triggered by BGPPolicy add events,. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().Start(gomock.Any()) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer1Config) + require.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + + // Create an Egress. + egress := generateEgress("eg1-4", "192.168.77.200", localNodeName) + _, err := c.crdClient.CrdV1beta1().Egresses().Create(context.TODO(), egress, metav1.CreateOptions{}) + require.NoError(t, err) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.200/32"}})) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update the Egress. + updatedEgress := generateEgress("eg1-4", "192.168.77.201", localNodeName) + _, err = c.crdClient.CrdV1beta1().Egresses().Update(context.TODO(), updatedEgress, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.201/32"}})) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.200/32"}})) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update the Egress. + updatedEgress = generateEgress("eg1-4", "192.168.77.201", "remote") + _, err = c.crdClient.CrdV1beta1().Egresses().Update(context.TODO(), updatedEgress, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.201/32"}})) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Update the Egress. + updatedEgress = generateEgress("eg1-4", "192.168.77.201", localNodeName) + _, err = c.crdClient.CrdV1beta1().Egresses().Update(context.TODO(), updatedEgress, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.201/32"}})) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + + // Delete the Egress. + err = c.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), updatedEgress.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.201/32"}})) + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) +} + +func TestBGPSecretUpdate(t *testing.T) { + bp := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv4Peer2, ipv4Peer3}) + c := newFakeController(t, []runtime.Object{node}, []runtime.Object{bp}, true, false) + + c.secretInformer = coreinformers.NewFilteredSecretInformer(c.client, + namespaceKubeSystem, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", types.BGPPolicySecretName).String() + }) + c.secretInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addSecret, + UpdateFunc: c.updateSecret, + DeleteFunc: c.deleteSecret, + }) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + go c.secretInformer.Run(stopCh) + + // Create the Secret. + secret := generateSecret(bgpPeerPasswords) + _, err := c.client.CoreV1().Secrets(namespaceKubeSystem).Create(context.TODO(), secret, metav1.CreateOptions{}) + require.NoError(t, err) + + require.Eventually(t, func() bool { + c.bgpPeerPasswordsMutex.RLock() + defer c.bgpPeerPasswordsMutex.RUnlock() + if reflect.DeepEqual(c.bgpPeerPasswords, bgpPeerPasswords) { + return true + } + return false + }, 5*time.Second, 10*time.Millisecond) + + // Wait for the dummy event triggered by BGPPolicy add events. + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().Start(gomock.Any()) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer3Config) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + + // Update the Secret. + updatedBGPPeerPasswords := map[string]string{ + generateBGPPeerKey(ipv4Peer1Addr, peer1ASN): "updated-" + peer1AuthPassword, + generateBGPPeerKey(ipv4Peer2Addr, peer2ASN): peer2AuthPassword, + generateBGPPeerKey(ipv4Peer3Addr, peer3ASN): "updated-" + peer3AuthPassword, + } + updatedSecret := generateSecret(updatedBGPPeerPasswords) + _, err = c.client.CoreV1().Secrets(namespaceKubeSystem).Update(context.TODO(), updatedSecret, metav1.UpdateOptions{}) + require.NoError(t, err) + require.Eventually(t, func() bool { + c.bgpPeerPasswordsMutex.RLock() + defer c.bgpPeerPasswordsMutex.RUnlock() + if reflect.DeepEqual(c.bgpPeerPasswords, updatedBGPPeerPasswords) { + return true + } + return false + }, 5*time.Second, 10*time.Millisecond) + + // Wait for the dummy event triggered by Secret update event, and mark it done. + waitAndGetDummyEvent(t, c) + updatedIPv4Peer1Config := ipv4Peer1Config + updatedIPv4Peer3Config := ipv4Peer3Config + updatedIPv4Peer1Config.Password = "updated-" + peer1AuthPassword + updatedIPv4Peer3Config.Password = "updated-" + peer3AuthPassword + mockBGPServer.EXPECT().UpdatePeer(gomock.Any(), updatedIPv4Peer1Config) + mockBGPServer.EXPECT().UpdatePeer(gomock.Any(), updatedIPv4Peer3Config) + require.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) +} + +func TestSyncBGPPolicyFailures(t *testing.T) { + bp1 := generateBGPPolicy(bgpPolicyName1, + creationTimestamp, + nodeLabels1, + 179, + 65000, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer2}) + bp2 := generateBGPPolicy(bgpPolicyName2, + creationTimestampAdd1s, + nodeLabels1, + 1179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1}) + bp3 := generateBGPPolicy(bgpPolicyName3, + creationTimestampAdd2s, + nodeLabels1, + 1179, + 65000, + false, + true, + false, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer2}) + bp4 := generateBGPPolicy(bgpPolicyName4, + creationTimestampAdd3s, + nodeLabels1, + 1179, + 65000, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{updatedIPv4Peer2}) + objects := []runtime.Object{ + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + } + crdObjects := []runtime.Object{ + bp1, + bp2, + bp3, + bp4, + } + + c := newFakeController(t, objects, crdObjects, true, false) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + ctx := context.Background() + c.startInformers(stopCh) + + // Wait for the dummy event triggered by BGPPolicy ADD events. + waitAndGetDummyEvent(t, c) + + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + mockBGPServer.EXPECT().Start(gomock.Any()) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + require.NoError(t, c.syncBGPPolicy(ctx)) + // Done with the dummy event. + doneDummyEvent(t, c) + + checkBGPPolicyState(t, generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(loadBalancerIPv4)}, + []bgp.PeerConfig{ipv4Peer2Config}), + c.bgpPolicyState) + + // Delete the effective BGPPolicy bp1, and BGPPolicy bp2 will be the effective one. + require.NoError(t, c.crdClient.CrdV1alpha1().BGPPolicies().Delete(context.TODO(), bp1.Name, metav1.DeleteOptions{})) + + waitAndGetDummyEvent(t, c) + + // The local ASN of BGPPolicy bp2 is not the same as BGPPolicy bp1, and the current BGP server should be stopped. + // Mock that failing in stopping the current BGP server. + mockBGPServer.EXPECT().Stop(gomock.Any()).Return(fmt.Errorf("failed reason")) + require.EqualError(t, c.syncBGPPolicy(ctx), "failed to stop current BGP server: failed reason") + checkBGPPolicyState(t, generateBGPPolicyState(179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(loadBalancerIPv4)}, + []bgp.PeerConfig{ipv4Peer2Config}), + c.bgpPolicyState) + + // Mock the retry. Stop the current BGP server successfully, but fail in starting a new BGP server. + mockBGPServer.EXPECT().Stop(gomock.Any()) + mockBGPServer.EXPECT().Start(gomock.Any()).Return(fmt.Errorf("failed reason")) + require.EqualError(t, c.syncBGPPolicy(ctx), "failed to start BGP server: failed reason") + checkBGPPolicyState(t, nil, c.bgpPolicyState) + + // Mock the retry. Start BGP server successfully, but fail in adding BGP peer. + mockBGPServer.EXPECT().Start(gomock.Any()) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer1Config).Return(fmt.Errorf("failed to add BGP peer")) + require.EqualError(t, c.syncBGPPolicy(ctx), "failed to add BGP peer") + checkBGPPolicyState(t, generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{}, + []bgp.PeerConfig{}), + c.bgpPolicyState) + + // Mock the retry. Add the BGP peer successfully, but fail in advertising routes. + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: podIPv4CIDR.String()}}).Return(fmt.Errorf("failed to advertise routes")) + require.EqualError(t, c.syncBGPPolicy(ctx), "failed to advertise routes") + // Done with the dummy event. + doneDummyEvent(t, c) + checkBGPPolicyState(t, generateBGPPolicyState( + 1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{}, + []bgp.PeerConfig{ipv4Peer1Config}), + c.bgpPolicyState) + + // Delete the effective BGPPolicy bp2, and BGPPolicy bp3 will be the effective one. The BGP server doesn't need to + // be updated. The peers and routes will be reconciled according to the existing BGPPolicy state. + require.NoError(t, c.crdClient.CrdV1alpha1().BGPPolicies().Delete(context.TODO(), bp2.Name, metav1.DeleteOptions{})) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().AddPeer(gomock.Any(), ipv4Peer2Config) + mockBGPServer.EXPECT().RemovePeer(gomock.Any(), ipv4Peer1Config) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + checkBGPPolicyState(t, generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(externalIPv4)}, + []bgp.PeerConfig{ipv4Peer2Config}), + c.bgpPolicyState) + + // Delete the effective BGPPolicy bp3, and BGPPolicy bp4 will be the effective one. The BGP server doesn't need to + // be updated. The peers and routes will be reconciled according to the existing BGPPolicy state. + require.NoError(t, c.crdClient.CrdV1alpha1().BGPPolicies().Delete(context.TODO(), bp3.Name, metav1.DeleteOptions{})) + + waitAndGetDummyEvent(t, c) + mockBGPServer.EXPECT().UpdatePeer(gomock.Any(), updatedIPv4Peer2Config) + mockBGPServer.EXPECT().WithdrawRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(externalIPv4)}}) + mockBGPServer.EXPECT().AdvertiseRoutes(gomock.Any(), []bgp.Route{{Prefix: ipStrToPrefix(loadBalancerIPv4)}}) + + require.NoError(t, c.syncBGPPolicy(ctx)) + doneDummyEvent(t, c) + checkBGPPolicyState(t, generateBGPPolicyState(1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(loadBalancerIPv4)}, + []bgp.PeerConfig{updatedIPv4Peer2Config}), + c.bgpPolicyState) +} + +func generateBGPPolicyState(listenPort int32, + localASN int32, + routerID string, + prefixes []string, + peerConfigs []bgp.PeerConfig) *bgpPolicyState { + routes := sets.New[bgp.Route]() + peerConfigMap := make(map[string]bgp.PeerConfig) + for _, prefix := range prefixes { + routes.Insert(bgp.Route{Prefix: prefix}) + } + for _, peerConfig := range peerConfigs { + peerKey := generateBGPPeerKey(peerConfig.Address, peerConfig.ASN) + peerConfigMap[peerKey] = peerConfig + } + return &bgpPolicyState{ + listenPort: listenPort, + localASN: localASN, + routerID: routerID, + routes: routes, + peerConfigs: peerConfigMap, + } +} + +func deepCopyBGPPolicyState(in *bgpPolicyState) *bgpPolicyState { + peerConfigMap := make(map[string]bgp.PeerConfig) + for _, peerConfig := range in.peerConfigs { + peerKey := generateBGPPeerKey(peerConfig.Address, peerConfig.ASN) + peerConfigMap[peerKey] = peerConfig + } + + return &bgpPolicyState{ + listenPort: in.listenPort, + localASN: in.localASN, + routerID: in.routerID, + routes: in.routes.Union(nil), + peerConfigs: peerConfigMap, + } +} + +func checkBGPPolicyState(t *testing.T, expected, got *bgpPolicyState) { + require.Equal(t, expected != nil, got != nil) + if expected != nil { + assert.Equal(t, expected.listenPort, got.listenPort) + assert.Equal(t, expected.localASN, got.localASN) + assert.Equal(t, expected.routerID, got.routerID) + assert.Equal(t, expected.routes, got.routes) + assert.Equal(t, expected.peerConfigs, got.peerConfigs) + } +} + +func generateBGPPolicy(name string, + creationTimestamp metav1.Time, + nodeSelector map[string]string, + listenPort int32, + localASN int32, + advertiseClusterIP bool, + advertiseExternalIP bool, + advertiseLoadBalancerIP bool, + advertiseEgressIP bool, + advertisePodCIDR bool, + externalPeers []v1alpha1.BGPPeer) *v1alpha1.BGPPolicy { + var advertisement v1alpha1.Advertisements + advertisement.Service = &v1alpha1.ServiceAdvertisement{} + if advertiseClusterIP { + advertisement.Service.IPTypes = append(advertisement.Service.IPTypes, v1alpha1.ServiceIPTypeClusterIP) + } + if advertiseExternalIP { + advertisement.Service.IPTypes = append(advertisement.Service.IPTypes, v1alpha1.ServiceIPTypeExternalIP) + } + if advertiseLoadBalancerIP { + advertisement.Service.IPTypes = append(advertisement.Service.IPTypes, v1alpha1.ServiceIPTypeLoadBalancerIP) + } + if advertiseEgressIP { + advertisement.Egress = &v1alpha1.EgressAdvertisement{} + } + + if advertisePodCIDR { + advertisement.Pod = &v1alpha1.PodAdvertisement{} + } + return &v1alpha1.BGPPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: "test-uid", + CreationTimestamp: creationTimestamp, + }, + Spec: v1alpha1.BGPPolicySpec{ + NodeSelector: metav1.LabelSelector{MatchLabels: nodeSelector}, + LocalASN: localASN, + ListenPort: &listenPort, + Advertisements: advertisement, + BGPPeers: externalPeers, + }, + } +} + +func generateService(name string, + svcType corev1.ServiceType, + clusterIP string, + externalIP string, + LoadBalancerIP string, + internalTrafficPolicyLocal bool, + externalTrafficPolicyLocal bool) *corev1.Service { + itp := corev1.ServiceInternalTrafficPolicyCluster + if internalTrafficPolicyLocal { + itp = corev1.ServiceInternalTrafficPolicyLocal + } + etp := corev1.ServiceExternalTrafficPolicyCluster + if externalTrafficPolicyLocal { + etp = corev1.ServiceExternalTrafficPolicyLocal + } + var externalIPs []string + if externalIP != "" { + externalIPs = append(externalIPs, externalIP) + } + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespaceDefault, + UID: "test-uid", + }, + Spec: corev1.ServiceSpec{ + Type: svcType, + ClusterIP: clusterIP, + Ports: []corev1.ServicePort{{ + Name: "p80", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + ClusterIPs: []string{clusterIP}, + ExternalIPs: externalIPs, + InternalTrafficPolicy: &itp, + ExternalTrafficPolicy: etp, + }, + } + if LoadBalancerIP != "" { + ingress := []corev1.LoadBalancerIngress{{IP: LoadBalancerIP}} + svc.Status.LoadBalancer.Ingress = ingress + } + return svc +} + +func generateEgress(name string, ip string, nodeName string) *crdv1b1.Egress { + return &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: "test-uid", + }, + Spec: crdv1b1.EgressSpec{ + EgressIP: ip, + }, + Status: crdv1b1.EgressStatus{ + EgressIP: ip, + EgressNode: nodeName, + }, + } +} + +func generateNode(name string, labels, annotations map[string]string) *corev1.Node { + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: "test-uid", + Labels: labels, + Annotations: annotations, + }, + } +} + +func generateEndpointSlice(svcName string, + suffix string, + isLocal bool, + isIPv6 bool, + endpointIP string) *discovery.EndpointSlice { + addrType := discovery.AddressTypeIPv4 + if isIPv6 { + addrType = discovery.AddressTypeIPv6 + } + var nodeName *string + if isLocal { + nodeName = &localNodeName + } + protocol := corev1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", svcName, suffix), + Namespace: namespaceDefault, + UID: "test-uid", + Labels: map[string]string{ + discovery.LabelServiceName: svcName, + }, + }, + AddressType: addrType, + } + if endpointIP != "" { + endpointSlice.Endpoints = []discovery.Endpoint{{ + Addresses: []string{ + endpointIP, + }, + Conditions: discovery.EndpointConditions{ + Ready: ptr.To(true), + }, + Hostname: nodeName, + NodeName: nodeName, + }} + endpointSlice.Ports = []discovery.EndpointPort{{ + Name: ptr.To("p80"), + Port: ptr.To(int32(80)), + Protocol: &protocol, + }} + } + return endpointSlice +} + +func generateBGPPeer(ip string, asn, port, gracefulRestartTimeSeconds int32) v1alpha1.BGPPeer { + return v1alpha1.BGPPeer{ + Address: ip, + Port: &port, + ASN: asn, + MultihopTTL: ptr.To(int32(1)), + GracefulRestartTimeSeconds: &gracefulRestartTimeSeconds, + } +} + +func generateBGPPeerConfig(peerConfig *v1alpha1.BGPPeer, password string) bgp.PeerConfig { + return bgp.PeerConfig{ + BGPPeer: peerConfig, + Password: password, + } +} + +func generateSecret(rawData map[string]string) *corev1.Secret { + data := make(map[string][]byte) + for k, v := range rawData { + data[k] = []byte(v) + } + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: types.BGPPolicySecretName, + Namespace: namespaceKubeSystem, + UID: "test-uid", + }, + Type: corev1.SecretTypeOpaque, + Data: data, + } +} + +func ipStrToPrefix(ipStr string) string { + if utilnet.IsIPv4String(ipStr) { + return ipStr + ipv4Suffix + } else if utilnet.IsIPv6String(ipStr) { + return ipStr + ipv6Suffix + } + return "" +} + +func waitAndGetDummyEvent(t *testing.T, c *fakeController) { + require.Eventually(t, func() bool { + return c.queue.Len() == 1 + }, 5*time.Second, 10*time.Millisecond) + c.queue.Get() +} +func doneDummyEvent(t *testing.T, c *fakeController) { + c.queue.Done(dummyKey) +} diff --git a/pkg/agent/types/annotations.go b/pkg/agent/types/annotations.go index b5b63e1a8d8..cc74150f280 100644 --- a/pkg/agent/types/annotations.go +++ b/pkg/agent/types/annotations.go @@ -27,6 +27,9 @@ const ( // NodeMaxEgressIPsAnnotationKey represents the key of maximum Egress IP number in the Annotations of the Node. NodeMaxEgressIPsAnnotationKey string = "node.antrea.io/max-egress-ips" + // NodeBGPRouterIDAnnotationKey represents the key of the Node's BGP router ID in the Annotations of the Node. + NodeBGPRouterIDAnnotationKey string = "node.antrea.io/bgp-router-id" + // ServiceExternalIPPoolAnnotationKey is the key of the Service annotation that specifies the Service's desired external IP pool. ServiceExternalIPPoolAnnotationKey string = "service.antrea.io/external-ip-pool" diff --git a/pkg/agent/types/bgppolicy.go b/pkg/agent/types/bgppolicy.go new file mode 100644 index 00000000000..8480797841a --- /dev/null +++ b/pkg/agent/types/bgppolicy.go @@ -0,0 +1,23 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +// BGPPolicySecretName is the name of the Kubernetes Secret used to store BGP peer passwords. Each entry in the Secret +// uses a key which is a concatenated string of BGP peer IP address and ASN. The value is the password for that BGP peer. +// +// Examples of keys: +// - "192.168.77.100-65000" +// - "2001:db8::1-65000" +const BGPPolicySecretName = "antrea-bgp-passwords" // #nosec G101 diff --git a/pkg/apiserver/handlers/featuregates/handler_test.go b/pkg/apiserver/handlers/featuregates/handler_test.go index adb630a1324..4475fcd2c47 100644 --- a/pkg/apiserver/handlers/featuregates/handler_test.go +++ b/pkg/apiserver/handlers/featuregates/handler_test.go @@ -55,6 +55,7 @@ func Test_getGatesResponse(t *testing.T) { {Component: "agent", Name: "AntreaIPAM", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "AntreaPolicy", Status: "Disabled", Version: "BETA"}, {Component: "agent", Name: "AntreaProxy", Status: "Enabled", Version: "GA"}, + {Component: "agent", Name: "BGPPolicy", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "CleanupStaleUDPSvcConntrack", Status: cleanupStaleUDPSvcConntrackStatus, Version: "BETA"}, {Component: "agent", Name: "Egress", Status: egressStatus, Version: "BETA"}, {Component: "agent", Name: "EgressSeparateSubnet", Status: "Disabled", Version: "ALPHA"}, diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 05cb51a9df5..e6dd9c9570e 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -163,6 +163,11 @@ const ( // alpha: v2.1 // Enable the NodeLatencyMonitor feature. NodeLatencyMonitor featuregate.Feature = "NodeLatencyMonitor" + + // alpha: v2.1 + // Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress + // IPs to remote BGP peers. + BGPPolicy featuregate.Feature = "BGPPolicy" ) var ( @@ -179,6 +184,7 @@ var ( DefaultAntreaFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ AntreaPolicy: {Default: true, PreRelease: featuregate.Beta}, AntreaProxy: {Default: true, PreRelease: featuregate.GA}, + BGPPolicy: {Default: false, PreRelease: featuregate.Alpha}, Egress: {Default: true, PreRelease: featuregate.Beta}, EndpointSlice: {Default: true, PreRelease: featuregate.GA}, TopologyAwareHints: {Default: true, PreRelease: featuregate.Beta}, @@ -213,6 +219,7 @@ var ( AntreaIPAM, AntreaPolicy, AntreaProxy, + BGPPolicy, CleanupStaleUDPSvcConntrack, Egress, EndpointSlice, @@ -267,8 +274,11 @@ var ( // can have different FeatureSpecs between Linux and Windows, we should // still define a separate defaultAntreaFeatureGates map for Windows. unsupportedFeaturesOnWindows = map[featuregate.Feature]struct{}{ - Egress: {}, - AntreaIPAM: {}, + Egress: {}, + AntreaIPAM: {}, + // BGPPolicy feature is not validated on Windows yet. This can be removed + // in the future if it's fully tested on Windows. + BGPPolicy: {}, Multicast: {}, SecondaryNetwork: {}, ServiceExternalIP: {},