diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 7b2a494de01..65e2f942612 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -1093,6 +1093,22 @@ rules: --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole +metadata: + labels: + app: antrea + name: antrea-cluster-identity-reader +rules: +- apiGroups: + - "" + resourceNames: + - antrea-cluster-identity + resources: + - configmaps + verbs: + - get +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole metadata: labels: app: antrea @@ -1159,6 +1175,7 @@ rules: - "" resourceNames: - antrea-ca + - antrea-cluster-identity resources: - configmaps verbs: @@ -1299,6 +1316,14 @@ metadata: namespace: kube-system --- apiVersion: v1 +kind: ConfigMap +metadata: + labels: + app: antrea + name: antrea-cluster-identity + namespace: kube-system +--- +apiVersion: v1 data: antrea-agent.conf: | # FeatureGates is a map of feature names to bools that enable or disable experimental features. diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 3970306c5c1..05f663c0945 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -1093,6 +1093,22 @@ rules: --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole +metadata: + labels: + app: antrea + name: antrea-cluster-identity-reader +rules: +- apiGroups: + - "" + resourceNames: + - antrea-cluster-identity + resources: + - configmaps + verbs: + - get +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole metadata: labels: app: antrea @@ -1159,6 +1175,7 @@ rules: - "" resourceNames: - antrea-ca + - antrea-cluster-identity resources: - configmaps verbs: @@ -1299,6 +1316,14 @@ metadata: namespace: kube-system --- apiVersion: v1 +kind: ConfigMap +metadata: + labels: + app: antrea + name: antrea-cluster-identity + namespace: kube-system +--- +apiVersion: v1 data: antrea-agent.conf: | # FeatureGates is a map of feature names to bools that enable or disable experimental features. diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 519503cefa7..7e6740818d0 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -1093,6 +1093,22 @@ rules: --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole +metadata: + labels: + app: antrea + name: antrea-cluster-identity-reader +rules: +- apiGroups: + - "" + resourceNames: + - antrea-cluster-identity + resources: + - configmaps + verbs: + - get +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole metadata: labels: app: antrea @@ -1159,6 +1175,7 @@ rules: - "" resourceNames: - antrea-ca + - antrea-cluster-identity resources: - configmaps verbs: @@ -1299,6 +1316,14 @@ metadata: namespace: kube-system --- apiVersion: v1 +kind: ConfigMap +metadata: + labels: + app: antrea + name: antrea-cluster-identity + namespace: kube-system +--- +apiVersion: v1 data: antrea-agent.conf: | # FeatureGates is a map of feature names to bools that enable or disable experimental features. diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index ac996a1627f..861c1364e11 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -1093,6 +1093,22 @@ rules: --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole +metadata: + labels: + app: antrea + name: antrea-cluster-identity-reader +rules: +- apiGroups: + - "" + resourceNames: + - antrea-cluster-identity + resources: + - configmaps + verbs: + - get +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole metadata: labels: app: antrea @@ -1159,6 +1175,7 @@ rules: - "" resourceNames: - antrea-ca + - antrea-cluster-identity resources: - configmaps verbs: @@ -1299,6 +1316,14 @@ metadata: namespace: kube-system --- apiVersion: v1 +kind: ConfigMap +metadata: + labels: + app: antrea + name: antrea-cluster-identity + namespace: kube-system +--- +apiVersion: v1 data: antrea-agent.conf: | # FeatureGates is a map of feature names to bools that enable or disable experimental features. diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 8d833cb9ac8..0d8474c48a3 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -1093,6 +1093,22 @@ rules: --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole +metadata: + labels: + app: antrea + name: antrea-cluster-identity-reader +rules: +- apiGroups: + - "" + resourceNames: + - antrea-cluster-identity + resources: + - configmaps + verbs: + - get +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole metadata: labels: app: antrea @@ -1159,6 +1175,7 @@ rules: - "" resourceNames: - antrea-ca + - antrea-cluster-identity resources: - configmaps verbs: @@ -1299,6 +1316,14 @@ metadata: namespace: kube-system --- apiVersion: v1 +kind: ConfigMap +metadata: + labels: + app: antrea + name: antrea-cluster-identity + namespace: kube-system +--- +apiVersion: v1 data: antrea-agent.conf: | # FeatureGates is a map of feature names to bools that enable or disable experimental features. diff --git a/build/yamls/base/cluster-identity-reader.yml b/build/yamls/base/cluster-identity-reader.yml new file mode 100644 index 00000000000..d2d93f54859 --- /dev/null +++ b/build/yamls/base/cluster-identity-reader.yml @@ -0,0 +1,13 @@ +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: antrea-cluster-identity-reader +rules: + - apiGroups: + - "" + resources: + - configmaps + resourceNames: + - antrea-cluster-identity + verbs: + - get diff --git a/build/yamls/base/controller-rbac.yml b/build/yamls/base/controller-rbac.yml index 358c928252d..778edb522b5 100644 --- a/build/yamls/base/controller-rbac.yml +++ b/build/yamls/base/controller-rbac.yml @@ -79,6 +79,7 @@ rules: - configmaps resourceNames: - antrea-ca + - antrea-cluster-identity verbs: - get - update diff --git a/build/yamls/base/controller.yml b/build/yamls/base/controller.yml index 4b706957c52..ae988e41a76 100644 --- a/build/yamls/base/controller.yml +++ b/build/yamls/base/controller.yml @@ -16,6 +16,11 @@ kind: ConfigMap metadata: name: antrea-ca --- +apiVersion: v1 +kind: ConfigMap +metadata: + name: antrea-cluster-identity +--- apiVersion: apiregistration.k8s.io/v1 kind: APIService metadata: diff --git a/build/yamls/base/kustomization.yml b/build/yamls/base/kustomization.yml index 1ad4681ac6d..277feef078d 100644 --- a/build/yamls/base/kustomization.yml +++ b/build/yamls/base/kustomization.yml @@ -6,6 +6,7 @@ resources: - controller.yml - agent-rbac.yml - agent.yml +- cluster-identity-reader.yml configMapGenerator: - files: - conf/antrea-controller.conf diff --git a/build/yamls/flow-aggregator.yml b/build/yamls/flow-aggregator.yml index 31f3cac4ab3..840264d20c8 100644 --- a/build/yamls/flow-aggregator.yml +++ b/build/yamls/flow-aggregator.yml @@ -103,6 +103,21 @@ subjects: name: antrea-agent namespace: kube-system --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + labels: + app: flow-aggregator + name: flow-aggregator-cluster-id-reader +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: antrea-cluster-identity-reader +subjects: +- kind: ServiceAccount + name: flow-aggregator + namespace: flow-aggregator +--- apiVersion: v1 kind: ConfigMap metadata: @@ -127,14 +142,22 @@ data: # Provide the transport protocol for the flow aggregator collecting process, which is tls, tcp or udp. #aggregatorTransportProtocol: "tls" - # Provide DNS name or IP address of flow aggregator for generating TLS certificate. It must match the flowCollectorAddr parameter in the antrea-agent config. + # Provide DNS name or IP address of flow aggregator for generating TLS certificate. It must match + # the flowCollectorAddr parameter in the antrea-agent config. #flowAggregatorAddress: "flow-aggregator.flow-aggregator.svc" + + # Provide the 32-bit Observation Domain ID which will uniquely identify this instance of the flow + # aggregator to an external flow collector. If omitted, an Observation Domain ID will be generated + # from the persistent cluster UUID generated by Antrea. Failing that (e.g. because the cluster UUID + # is not available), a value will be randomly generated, which may vary across restarts of the flow + # aggregator. + #observationDomainID: kind: ConfigMap metadata: annotations: {} labels: app: flow-aggregator - name: flow-aggregator-configmap-ccfdtmg954 + name: flow-aggregator-configmap-hf78268hm6 namespace: flow-aggregator --- apiVersion: v1 @@ -199,5 +222,5 @@ spec: serviceAccountName: flow-aggregator volumes: - configMap: - name: flow-aggregator-configmap-ccfdtmg954 + name: flow-aggregator-configmap-hf78268hm6 name: flow-aggregator-config diff --git a/build/yamls/flow-aggregator/base/conf/flow-aggregator.conf b/build/yamls/flow-aggregator/base/conf/flow-aggregator.conf index 3f21d756ca3..fbf4721e3a9 100644 --- a/build/yamls/flow-aggregator/base/conf/flow-aggregator.conf +++ b/build/yamls/flow-aggregator/base/conf/flow-aggregator.conf @@ -11,5 +11,13 @@ # Provide the transport protocol for the flow aggregator collecting process, which is tls, tcp or udp. #aggregatorTransportProtocol: "tls" -# Provide DNS name or IP address of flow aggregator for generating TLS certificate. It must match the flowCollectorAddr parameter in the antrea-agent config. +# Provide DNS name or IP address of flow aggregator for generating TLS certificate. It must match +# the flowCollectorAddr parameter in the antrea-agent config. #flowAggregatorAddress: "flow-aggregator.flow-aggregator.svc" + +# Provide the 32-bit Observation Domain ID which will uniquely identify this instance of the flow +# aggregator to an external flow collector. If omitted, an Observation Domain ID will be generated +# from the persistent cluster UUID generated by Antrea. Failing that (e.g. because the cluster UUID +# is not available), a value will be randomly generated, which may vary across restarts of the flow +# aggregator. +#observationDomainID: diff --git a/build/yamls/flow-aggregator/base/flow-aggregator.yml b/build/yamls/flow-aggregator/base/flow-aggregator.yml index 3f38b29e7f8..4f349168ea7 100644 --- a/build/yamls/flow-aggregator/base/flow-aggregator.yml +++ b/build/yamls/flow-aggregator/base/flow-aggregator.yml @@ -77,6 +77,19 @@ roleRef: name: flow-exporter-role apiGroup: rbac.authorization.k8s.io --- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: flow-aggregator-cluster-id-reader +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: antrea-cluster-identity-reader +subjects: + - kind: ServiceAccount + name: flow-aggregator + namespace: flow-aggregator +--- apiVersion: v1 kind: Service metadata: diff --git a/cmd/antrea-controller/controller.go b/cmd/antrea-controller/controller.go index fea5323c0c6..0d2f71deed7 100644 --- a/cmd/antrea-controller/controller.go +++ b/cmd/antrea-controller/controller.go @@ -35,6 +35,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/apiserver/openapi" "github.com/vmware-tanzu/antrea/pkg/apiserver/storage" crdinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions" + "github.com/vmware-tanzu/antrea/pkg/clusteridentity" "github.com/vmware-tanzu/antrea/pkg/controller/metrics" "github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy" "github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy/store" @@ -47,6 +48,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/monitor" "github.com/vmware-tanzu/antrea/pkg/signals" "github.com/vmware-tanzu/antrea/pkg/util/cipher" + "github.com/vmware-tanzu/antrea/pkg/util/env" "github.com/vmware-tanzu/antrea/pkg/version" ) @@ -106,6 +108,12 @@ func run(o *Options) error { traceflowInformer := crdInformerFactory.Ops().V1alpha1().Traceflows() cgInformer := crdInformerFactory.Core().V1alpha2().ClusterGroups() + clusterIdentityAllocator := clusteridentity.NewClusterIdentityAllocator( + env.GetAntreaNamespace(), + clusteridentity.DefaultClusterIdentityConfigMapName, + client, + ) + // Create Antrea object storage. addressGroupStore := store.NewAddressGroupStore() appliedToGroupStore := store.NewAppliedToGroupStore() @@ -196,6 +204,8 @@ func run(o *Options) error { informerFactory.Start(stopCh) crdInformerFactory.Start(stopCh) + go clusterIdentityAllocator.Run(stopCh) + go controllerMonitor.Run(stopCh) go networkPolicyController.Run(stopCh) diff --git a/cmd/flow-aggregator/config.go b/cmd/flow-aggregator/config.go index 9093f8b026c..bffab605c9a 100644 --- a/cmd/flow-aggregator/config.go +++ b/cmd/flow-aggregator/config.go @@ -33,4 +33,10 @@ type FlowAggregatorConfig struct { // Provide DNS name or IP address of flow aggregator for generating TLS certificate. // Defaults to "flow-aggregator.flow-aggregator.svc" flowAggregatorAddress string `yaml:"flowAggregatorAddress,omitempty"` + // Provide the 32-bit Observation Domain ID which will uniquely identify this instance of the flow + // aggregator to an external flow collector. If omitted, an Observation Domain ID will be generated + // from the persistent cluster UUID generated by Antrea. Failing that (e.g. because the cluster UUID + // is not available), a value will be randomly generated, which may vary across restarts of the flow + // aggregator. + ObservationDomainID *uint32 `yaml:"observationDomainID,omitempty"` } diff --git a/cmd/flow-aggregator/flow-aggregator.go b/cmd/flow-aggregator/flow-aggregator.go index b40b0804a19..c8c14b64384 100644 --- a/cmd/flow-aggregator/flow-aggregator.go +++ b/cmd/flow-aggregator/flow-aggregator.go @@ -16,15 +16,57 @@ package main import ( "fmt" + "hash/fnv" + "time" + "github.com/google/uuid" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog" + "github.com/vmware-tanzu/antrea/pkg/clusteridentity" aggregator "github.com/vmware-tanzu/antrea/pkg/flowaggregator" "github.com/vmware-tanzu/antrea/pkg/signals" ) +// genObservationDomainID generates an IPFIX Observation Domain ID when one is not provided by the +// user through the flow aggregator configuration. It will first try to generate one +// deterministically based on the cluster UUID (if available, with a timeout of 10s). Otherwise, it +// will generate a random one. The cluster UUID should be available if Antrea is deployed to the +// cluster ahead of the flow aggregator, which is the expectation since when deploying flow +// aggregator as a Pod, networking needs to be configured by the CNI plugin. +func genObservationDomainID(k8sClient kubernetes.Interface) uint32 { + const retryInterval = time.Second + const timeout = 10 * time.Second + const defaultAntreaNamespace = "kube-system" + + clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider( + defaultAntreaNamespace, + clusteridentity.DefaultClusterIdentityConfigMapName, + k8sClient, + ) + var clusterUUID uuid.UUID + if err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + var err error + if clusterUUID, err = clusterIdentityProvider.Get(); err != nil { + return false, nil + } else { + return true, nil + } + }); err != nil { + klog.Warningf( + "Unable to retrieve cluster UUID after %v (does ConfigMap '%s/%s' exist?); will generate a random observation domain ID", + timeout, defaultAntreaNamespace, clusteridentity.DefaultClusterIdentityConfigMapName, + ) + clusterUUID = uuid.New() + } + h := fnv.New32() + h.Write(clusterUUID[:]) + observationDomainID := h.Sum32() + return observationDomainID +} + func run(o *Options) error { klog.Infof("Flow aggregator starting...") // Set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will @@ -36,7 +78,24 @@ func run(o *Options) error { if err != nil { return fmt.Errorf("error when creating K8s client: %v", err) } - flowAggregator := aggregator.NewFlowAggregator(o.externalFlowCollectorAddr, o.externalFlowCollectorProto, o.exportInterval, o.aggregatorTransportProtocol, o.flowAggregatorAddress, k8sClient) + + var observationDomainID uint32 + if o.config.ObservationDomainID != nil { + observationDomainID = *o.config.ObservationDomainID + } else { + observationDomainID = genObservationDomainID(k8sClient) + } + klog.Infof("Flow aggregator Observation Domain ID: %d", observationDomainID) + + flowAggregator := aggregator.NewFlowAggregator( + o.externalFlowCollectorAddr, + o.externalFlowCollectorProto, + o.exportInterval, + o.aggregatorTransportProtocol, + o.flowAggregatorAddress, + k8sClient, + observationDomainID, + ) err = flowAggregator.InitCollectingProcess() if err != nil { return fmt.Errorf("error when creating collecting process: %v", err) diff --git a/pkg/apiserver/certificate/cacert_controller.go b/pkg/apiserver/certificate/cacert_controller.go index 0e35fdc8a2c..c5559090e31 100644 --- a/pkg/apiserver/certificate/cacert_controller.go +++ b/pkg/apiserver/certificate/cacert_controller.go @@ -80,13 +80,7 @@ type CACertController struct { var _ dynamiccertificates.Listener = &CACertController{} func GetCAConfigMapNamespace() string { - namespace := env.GetPodNamespace() - if namespace != "" { - return namespace - } - - klog.Warningf("Failed to get Pod Namespace from environment. Using \"%s\" as the CA ConfigMap Namespace", defaultAntreaNamespace) - return defaultAntreaNamespace + return env.GetAntreaNamespace() } func newCACertController(caContentProvider dynamiccertificates.CAContentProvider, diff --git a/pkg/apiserver/certificate/certificate.go b/pkg/apiserver/certificate/certificate.go index 213ff6c7a61..3d52164800f 100644 --- a/pkg/apiserver/certificate/certificate.go +++ b/pkg/apiserver/certificate/certificate.go @@ -54,18 +54,11 @@ const ( CACertFile = "ca.crt" TLSCertFile = "tls.crt" TLSKeyFile = "tls.key" - - defaultAntreaNamespace = "kube-system" ) // GetAntreaServerNames returns the DNS names that the TLS certificate will be signed with. func GetAntreaServerNames() []string { - namespace := env.GetPodNamespace() - if namespace == "" { - klog.Warningf("Failed to get Pod Namespace from environment. Using \"%s\" as the Antrea Service Namespace", defaultAntreaNamespace) - namespace = defaultAntreaNamespace - } - + namespace := env.GetAntreaNamespace() antreaServerName := "antrea." + namespace + ".svc" // TODO: Although antrea-agent and kube-aggregator only verify the server name "antrea..svc", // We should add the whole FQDN "antrea..svc." as an alternate DNS name when diff --git a/pkg/clusteridentity/clusteridentity.go b/pkg/clusteridentity/clusteridentity.go new file mode 100644 index 00000000000..51d3b10e44c --- /dev/null +++ b/pkg/clusteridentity/clusteridentity.go @@ -0,0 +1,175 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clusteridentity + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/klog" +) + +const ( + DefaultClusterIdentityConfigMapName = "antrea-cluster-identity" + uuidConfigMapKey = "uuid" +) + +// ClusterIdentityAllocator ensures that the antrea-cluster-identity ConfigMap is populated +// correctly, with a valid UUID. It is meant to be used by the Antrea Controller. +type ClusterIdentityAllocator struct { + clusterIdentityConfigMapNamespace string + clusterIdentityConfigMapName string + k8sClient clientset.Interface +} + +// NewClusterIdentityAllocator creates a ClusterIdentityAllocator object +func NewClusterIdentityAllocator( + clusterIdentityConfigMapNamespace string, + clusterIdentityConfigMapName string, + k8sClient clientset.Interface, +) *ClusterIdentityAllocator { + return &ClusterIdentityAllocator{ + clusterIdentityConfigMapNamespace: clusterIdentityConfigMapNamespace, + clusterIdentityConfigMapName: clusterIdentityConfigMapName, + k8sClient: k8sClient, + } +} + +func (a *ClusterIdentityAllocator) updateConfigMapIfNeeded() error { + configMap, err := a.k8sClient.CoreV1().ConfigMaps(a.clusterIdentityConfigMapNamespace).Get(context.TODO(), a.clusterIdentityConfigMapName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error when getting '%s/%s' ConfigMap: %v", a.clusterIdentityConfigMapNamespace, a.clusterIdentityConfigMapName, err) + } + + // returns a triplet consisting of the cluster UUID, a boolean indicating if the UUID needs + // to be written to the ConfigMap, and an error if applicable + inspectUUID := func() (uuid.UUID, bool, error) { + clusterUUIDStr, ok := configMap.Data[uuidConfigMapKey] + if ok && clusterUUIDStr != "" { + clusterUUID, err := uuid.Parse(clusterUUIDStr) + if err != nil { + return uuid.Nil, false, fmt.Errorf("cluster already has UUID '%s' but it is not valid: %v", clusterUUIDStr, err) + } + return clusterUUID, false, nil + } + + // generate a new random UUID + clusterUUID := uuid.New() + + return clusterUUID, true, nil + } + + clusterUUID, clusterUUIDNeedsUpdate, err := inspectUUID() + if err != nil { + return err + } + if !clusterUUIDNeedsUpdate { + klog.Infof("Existing cluster UUID: %v", clusterUUID) + return nil + } + + configMap.Data = map[string]string{ + uuidConfigMapKey: clusterUUID.String(), + } + if _, err := a.k8sClient.CoreV1().ConfigMaps(a.clusterIdentityConfigMapNamespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("error when updating '%s/%s' ConfigMap with new cluster identity: %v", a.clusterIdentityConfigMapNamespace, a.clusterIdentityConfigMapName, err) + } + klog.Infof("New cluster UUID: %v", clusterUUID) + return nil +} + +// Run will ensure that the antrea-cluster-identity ConfigMap is up-to-date. It is meant to be +// called asynchronously in its own goroutine, and will keep retrying in case of error, using an +// exponential backoff mechanism. +func (a *ClusterIdentityAllocator) Run(stopCh <-chan struct{}) { + // Exponential backoff, starting at 100ms with a factor of 2. A "steps" value of 8 means we + // will increase the backoff duration at most 8 times, so the max duration is (100ms * // + // 2^8), which is about 25s. + retry := wait.Backoff{ + Steps: 8, + Duration: 100 * time.Millisecond, + Factor: 2.0, + Jitter: 0.0, + } + + for { + err := a.updateConfigMapIfNeeded() + if err == nil { + return + } + sleepDuration := retry.Step() + klog.Errorf("Cannot validate or update cluster UUID because of the following error, will retry in %v: %v", sleepDuration, err) + select { + case <-stopCh: + return + case <-time.After(sleepDuration): + continue + } + } +} + +// ClusterIdentityProvider is an interface used to retrieve the cluster identity information (UUID), +// as provided by the user or generated by the Antrea Controller. +type ClusterIdentityProvider interface { + Get() (uuid.UUID, error) +} + +type clusterIdentityProvider struct { + clusterIdentityConfigMapNamespace string + clusterIdentityConfigMapName string + k8sClient clientset.Interface +} + +// NewClusterIdentityProvider returns a new object implementing the ClusterIdentityProvider +// interface. +func NewClusterIdentityProvider( + clusterIdentityConfigMapNamespace string, + clusterIdentityConfigMapName string, + k8sClient clientset.Interface, +) *clusterIdentityProvider { + return &clusterIdentityProvider{ + clusterIdentityConfigMapNamespace: clusterIdentityConfigMapNamespace, + clusterIdentityConfigMapName: clusterIdentityConfigMapName, + k8sClient: k8sClient, + } +} + +// Get will retrieve the cluster identity (UUID) stored in the antrea-cluster-identity ConfigMap. In +// case of error, clients are invited to retry as the information may not be available yet. +func (p *clusterIdentityProvider) Get() (uuid.UUID, error) { + configMap, err := p.k8sClient.CoreV1().ConfigMaps(p.clusterIdentityConfigMapNamespace).Get(context.TODO(), p.clusterIdentityConfigMapName, metav1.GetOptions{}) + if err != nil { + return uuid.Nil, fmt.Errorf("error when getting '%s/%s' ConfigMap: %v", p.clusterIdentityConfigMapNamespace, p.clusterIdentityConfigMapName, err) + } + + getUUID := func() (uuid.UUID, error) { + clusterUUIDStr, ok := configMap.Data[uuidConfigMapKey] + if !ok || clusterUUIDStr == "" { + return uuid.Nil, fmt.Errorf("cluster UUID has not been set yet") + } + clusterUUID, err := uuid.Parse(clusterUUIDStr) + if err != nil { + return uuid.Nil, fmt.Errorf("cluster UUID cannot be parsed") + } + return clusterUUID, nil + } + + return getUUID() +} diff --git a/pkg/clusteridentity/clusteridentity_test.go b/pkg/clusteridentity/clusteridentity_test.go new file mode 100644 index 00000000000..e5da781b42e --- /dev/null +++ b/pkg/clusteridentity/clusteridentity_test.go @@ -0,0 +1,113 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clusteridentity + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +const ( + antreaNamespace = "kube-system" + runTimeout = 2 * time.Second +) + +var ( + clusterUUID = uuid.New() + + idConfigMap = &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: antreaNamespace, + Name: DefaultClusterIdentityConfigMapName, + }, + Data: map[string]string{ + uuidConfigMapKey: clusterUUID.String(), + }, + } + + idConfigMapEmpty = &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: antreaNamespace, + Name: DefaultClusterIdentityConfigMapName, + }, + Data: map[string]string{}, + } +) + +func TestClusterIdentityAllocatorNew(t *testing.T) { + client := fake.NewSimpleClientset(idConfigMapEmpty) + allocator := NewClusterIdentityAllocator(antreaNamespace, DefaultClusterIdentityConfigMapName, client) + require.NoError(t, allocator.updateConfigMapIfNeeded()) + + provider := NewClusterIdentityProvider(antreaNamespace, DefaultClusterIdentityConfigMapName, client) + actualUUID, err := provider.Get() + require.NoError(t, err, "Error when retrieving cluster identity") + assert.NotEqual(t, uuid.Nil, actualUUID) +} + +func TestClusterIdentityAllocatorExisting(t *testing.T) { + client := fake.NewSimpleClientset(idConfigMap) + allocator := NewClusterIdentityAllocator(antreaNamespace, DefaultClusterIdentityConfigMapName, client) + require.NoError(t, allocator.updateConfigMapIfNeeded()) + + provider := NewClusterIdentityProvider(antreaNamespace, DefaultClusterIdentityConfigMapName, client) + actualUUID, err := provider.Get() + require.NoError(t, err, "Error when retrieving cluster identity") + assert.Equal(t, clusterUUID, actualUUID) +} + +func TestClusterIdentityProviderMissingConfigMap(t *testing.T) { + client := fake.NewSimpleClientset() + provider := NewClusterIdentityProvider(antreaNamespace, DefaultClusterIdentityConfigMapName, client) + _, err := provider.Get() + assert.Error(t, err, "Cluster identity should not be available") +} + +func runWrapper(ctx context.Context, allocator *ClusterIdentityAllocator) error { + stopCh := make(chan struct{}) + doneCh := make(chan struct{}) + defer close(stopCh) + go func() { + allocator.Run(stopCh) + close(doneCh) + }() + select { + case <-doneCh: // success + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func TestClusterIdentityAllocatorRun(t *testing.T) { + client := fake.NewSimpleClientset(idConfigMapEmpty) + allocator := NewClusterIdentityAllocator(antreaNamespace, DefaultClusterIdentityConfigMapName, client) + ctx, cancel := context.WithTimeout(context.Background(), runTimeout) + defer cancel() + require.NoError(t, runWrapper(ctx, allocator), "Cluster identity could not be updated") + + provider := NewClusterIdentityProvider(antreaNamespace, DefaultClusterIdentityConfigMapName, client) + actualUUID, err := provider.Get() + require.NoError(t, err, "Error when retrieving cluster identity") + assert.NotEqual(t, uuid.Nil, actualUUID) +} diff --git a/pkg/controller/networkpolicy/validate.go b/pkg/controller/networkpolicy/validate.go index e72b19f5f04..0514c95d535 100644 --- a/pkg/controller/networkpolicy/validate.go +++ b/pkg/controller/networkpolicy/validate.go @@ -74,8 +74,6 @@ var ( reservedTierNames = sets.NewString("baseline", "application", "platform", "networkops", "securityops", "emergency") ) -const defaultControllerNamespace = "kube-system" - // RegisterAntreaPolicyValidator registers an Antrea-native policy validator // to the resource registry. A new validator must be registered by calling // this function before the Run phase of the APIServer. @@ -602,11 +600,7 @@ func (t *tierValidator) updateValidate(curObj, oldObj interface{}, userInfo auth curTier := curObj.(*secv1alpha1.Tier) oldTier := oldObj.(*secv1alpha1.Tier) // Retrieve antrea-controller's Namespace - namespace := env.GetPodNamespace() - if namespace == "" { - // antrea-controller by default is created in the kube-system Namespace - namespace = defaultControllerNamespace - } + namespace := env.GetAntreaNamespace() // Allow exception of Tier Priority updates performed by the antrea-controller if serviceaccount.MatchesUsername(namespace, env.GetAntreaControllerServiceAccount(), userInfo.Username) { return "", true diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 91705e63439..3fb0f3c6793 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -16,7 +16,6 @@ package flowaggregator import ( "fmt" - "hash/fnv" "time" "github.com/vmware/go-ipfix/pkg/collector" @@ -155,9 +154,18 @@ type flowAggregator struct { set ipfix.IPFIXSet flowAggregatorAddress string k8sClient kubernetes.Interface + observationDomainID uint32 } -func NewFlowAggregator(externalFlowCollectorAddr string, externalFlowCollectorProto string, exportInterval time.Duration, aggregatorTransportProtocol AggregatorTransportProtocol, flowAggregatorAddress string, k8sClient kubernetes.Interface) *flowAggregator { +func NewFlowAggregator( + externalFlowCollectorAddr string, + externalFlowCollectorProto string, + exportInterval time.Duration, + aggregatorTransportProtocol AggregatorTransportProtocol, + flowAggregatorAddress string, + k8sClient kubernetes.Interface, + observationDomainID uint32, +) *flowAggregator { registry := ipfix.NewIPFIXRegistry() registry.LoadRegistry() fa := &flowAggregator{ @@ -173,17 +181,11 @@ func NewFlowAggregator(externalFlowCollectorAddr string, externalFlowCollectorPr ipfix.NewSet(false), flowAggregatorAddress, k8sClient, + observationDomainID, } return fa } -func (fa *flowAggregator) genObservationID() (uint32, error) { - // TODO: Change to use cluster UUID to generate observation ID once it's available - h := fnv.New32() - h.Write([]byte(fa.flowAggregatorAddress)) - return h.Sum32(), nil -} - func (fa *flowAggregator) InitCollectingProcess() error { var err error var cpInput collector.CollectorInput @@ -249,10 +251,6 @@ func (fa *flowAggregator) InitAggregationProcess() error { } func (fa *flowAggregator) initExportingProcess() error { - obsID, err := fa.genObservationID() - if err != nil { - return fmt.Errorf("cannot generate observation ID for flow aggregator: %v", err) - } // TODO: This code can be further simplified by changing the go-ipfix API to accept // externalFlowCollectorAddr and externalFlowCollectorProto instead of net.Addr input. var expInput exporter.ExporterInput @@ -261,7 +259,7 @@ func (fa *flowAggregator) initExportingProcess() error { expInput = exporter.ExporterInput{ CollectorAddress: fa.externalFlowCollectorAddr, CollectorProtocol: fa.externalFlowCollectorProto, - ObservationDomainID: obsID, + ObservationDomainID: fa.observationDomainID, TempRefTimeout: 0, PathMTU: 0, IsEncrypted: false, @@ -271,7 +269,7 @@ func (fa *flowAggregator) initExportingProcess() error { expInput = exporter.ExporterInput{ CollectorAddress: fa.externalFlowCollectorAddr, CollectorProtocol: fa.externalFlowCollectorProto, - ObservationDomainID: obsID, + ObservationDomainID: fa.observationDomainID, TempRefTimeout: 1800, PathMTU: 0, IsEncrypted: false, diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 51caf870563..f93925634a3 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -28,8 +28,9 @@ import ( ) const ( - testTemplateID = uint16(256) - testExportInterval = 60 * time.Second + testTemplateID = uint16(256) + testExportInterval = 60 * time.Second + testObservationDomainID = 0xabcd ) // TODO: We will add another test for sendDataRecord when we support adding multiple records to single set. @@ -56,6 +57,7 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) { ipfixtest.NewMockIPFIXSet(ctrl), "", nil, + testObservationDomainID, } // Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals) diff --git a/pkg/util/env/env.go b/pkg/util/env/env.go index 9cad9d94c0b..1e981db6ff9 100644 --- a/pkg/util/env/env.go +++ b/pkg/util/env/env.go @@ -29,6 +29,8 @@ const ( svcAcctNameEnvKey = "SERVICEACCOUNT_NAME" antreaCloudEKSEnvKey = "ANTREA_CLOUD_EKS" + + defaultAntreaNamespace = "kube-system" ) // GetNodeName returns the node's name used in Kubernetes, based on the priority: @@ -89,7 +91,19 @@ func getBoolEnvVar(name string, defaultValue bool) bool { return defaultValue } -// Returns true if Antrea is used to enforce NetworkPolicies in an EKS cluster. +// IsCloudEKS returns true if Antrea is used to enforce NetworkPolicies in an EKS cluster. func IsCloudEKS() bool { return getBoolEnvVar(antreaCloudEKSEnvKey, false) } + +// GetAntreaNamespace tries to determine the Namespace in which Antrea is running by looking at the +// POD_NAMESPACE environment variable. If this environment variable is not set (e.g. because the +// Antrea component is not run as a Pod), "kube-system" is returned. +func GetAntreaNamespace() string { + namespace := GetPodNamespace() + if namespace == "" { + klog.Warningf("Failed to get Pod Namespace from environment. Using \"%s\" as the Antrea Service Namespace", defaultAntreaNamespace) + namespace = defaultAntreaNamespace + } + return namespace +} diff --git a/test/e2e/basic_test.go b/test/e2e/basic_test.go index 612dc4c140d..c6bac6fb439 100644 --- a/test/e2e/basic_test.go +++ b/test/e2e/basic_test.go @@ -24,11 +24,14 @@ import ( "testing" "time" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/wait" "github.com/vmware-tanzu/antrea/pkg/agent/apiserver/handlers/podinterface" "github.com/vmware-tanzu/antrea/pkg/agent/config" "github.com/vmware-tanzu/antrea/pkg/agent/openflow/cookie" + "github.com/vmware-tanzu/antrea/pkg/clusteridentity" ) // TestDeploy is a "no-op" test that simply performs setup and teardown. @@ -733,3 +736,35 @@ func TestGratuitousARP(t *testing.T) { } t.Logf("Got %d ARP packets after Pod was up", arpPackets) } + +// TestClusterIdentity verifies that the antrea-cluster-identity ConfigMap is +// populated correctly by the Antrea Controller. +func TestClusterIdentity(t *testing.T) { + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + + clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider( + antreaNamespace, + clusteridentity.DefaultClusterIdentityConfigMapName, + data.clientset, + ) + + const retryInterval = time.Second + const timeout = 10 * time.Second + var clusterUUID uuid.UUID + err = wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + var err error + if clusterUUID, err = clusterIdentityProvider.Get(); err != nil { + return false, nil + } else { + t.Logf("Cluster UUID: %v", clusterUUID) + return true, nil + } + }) + + assert.NoError(t, err, "Failed to retrieve cluster identity information within %v", timeout) + assert.NotEqual(t, uuid.Nil, clusterUUID) +}