Skip to content

Commit

Permalink
Always require the cluster UUID in the FlowAggregator (#6714)
Browse files Browse the repository at this point in the history
Before this change, the cluster UUID was strictly required by the
S3Exporter and by the ClickHouseExporter, but somewhat optional for the
IPFIXExporter (if not available after a certain timeout, a random UUID
was generated and used to compute the IPFIX observation domain
ID). Furthermore, every exporter by responsible for calling
getClusterUUID independently, and that was the only reason for their
implementations to require access to the K8s client.

I think it makes more sense to make things more uniform, and require the
cluster UUID to be available regardless of which exporters are
enabled. We can retrieve the cluster UUID once during FlowAggrgator
initialization, then pass it along to all exporters which need it.

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas authored Oct 8, 2024
1 parent 55384dc commit ccf2dd0
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 93 deletions.
8 changes: 8 additions & 0 deletions cmd/flow-aggregator/flow-aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,16 @@ func run(configFile string) error {
podInformer := informerFactory.Core().V1().Pods()
podStore := podstore.NewPodStore(podInformer.Informer())

klog.InfoS("Retrieving Antrea cluster UUID")
clusterUUID, err := aggregator.GetClusterUUID(ctx, k8sClient)
if err != nil {
return err
}
klog.InfoS("Retrieved Antrea cluster UUID", "clusterUUID", clusterUUID)

flowAggregator, err := aggregator.NewFlowAggregator(
k8sClient,
clusterUUID,
podStore,
configFile,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Antrea Authors
// Copyright 2024 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package exporter
package flowaggregator

import (
"context"
Expand All @@ -26,31 +26,34 @@ import (
"antrea.io/antrea/pkg/clusteridentity"
)

// getClusterUUID retrieves the cluster UUID (if available, with a timeout of 10s).
// GetClusterUUID retrieves the cluster UUID (if available, with a timeout of 10s).
// Otherwise, it returns an empty cluster UUID and error. The cluster UUID should
// be available if Antrea is deployed to the cluster ahead of the flow aggregator,
// which is the expectation since when deploying flow aggregator as a Pod,
// networking needs to be configured by the CNI plugin.
func getClusterUUID(k8sClient kubernetes.Interface) (uuid.UUID, error) {
func GetClusterUUID(ctx context.Context, k8sClient kubernetes.Interface) (uuid.UUID, error) {
const retryInterval = time.Second
const timeout = 10 * time.Second
const defaultAntreaNamespace = "kube-system"

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider(
defaultAntreaNamespace,
clusteridentity.DefaultClusterIdentityConfigMapName,
k8sClient,
)
var clusterUUID uuid.UUID
if err := wait.PollUntilContextTimeout(context.TODO(), retryInterval, timeout, true, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, retryInterval, true, func(ctx context.Context) (bool, error) {
clusterIdentity, _, err := clusterIdentityProvider.Get()
if err != nil {
return false, nil
}
clusterUUID = clusterIdentity.UUID
return true, nil
}); err != nil {
return clusterUUID, fmt.Errorf("unable to retrieve cluster UUID from ConfigMap '%s/%s': timeout after %v", defaultAntreaNamespace, clusteridentity.DefaultClusterIdentityConfigMapName, timeout)
return clusterUUID, fmt.Errorf("unable to retrieve cluster UUID from ConfigMap '%s/%s': %w", defaultAntreaNamespace, clusteridentity.DefaultClusterIdentityConfigMapName, err)
}
return clusterUUID, nil
}
44 changes: 44 additions & 0 deletions pkg/flowaggregator/cluster_uuid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flowaggregator

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"k8s.io/client-go/kubernetes/fake"

"antrea.io/antrea/pkg/clusteridentity"
)

func TestGetClusterUUID(t *testing.T) {
ctx := context.Background()
client := fake.NewSimpleClientset()
clusterIdentityAllocator := clusteridentity.NewClusterIdentityAllocator(
"kube-system",
clusteridentity.DefaultClusterIdentityConfigMapName,
client,
)
stopCh := make(chan struct{})
defer close(stopCh)
go clusterIdentityAllocator.Run(stopCh)

ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
_, err := GetClusterUUID(ctx, client)
require.NoError(t, err, "cluster UUID not available")
}
8 changes: 2 additions & 6 deletions pkg/flowaggregator/exporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"reflect"
"time"

"github.com/google/uuid"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/flowaggregator/clickhouseclient"
Expand Down Expand Up @@ -57,7 +57,7 @@ func buildClickHouseConfig(opt *options.Options) clickhouseclient.ClickHouseConf
}
}

func NewClickHouseExporter(k8sClient kubernetes.Interface, opt *options.Options) (*ClickHouseExporter, error) {
func NewClickHouseExporter(clusterUUID uuid.UUID, opt *options.Options) (*ClickHouseExporter, error) {
chConfig := buildClickHouseConfig(opt)
klog.InfoS("ClickHouse configuration", "database", chConfig.Database, "databaseURL", chConfig.DatabaseURL, "debug", chConfig.Debug,
"compress", *chConfig.Compress, "commitInterval", chConfig.CommitInterval, "insecureSkipVerify", chConfig.InsecureSkipVerify, "caCert", chConfig.CACert)
Expand All @@ -77,10 +77,6 @@ func NewClickHouseExporter(k8sClient kubernetes.Interface, opt *options.Options)
return nil, fmt.Errorf("error when reading custom CA certificate: %v", errMessage)
}
}
clusterUUID, err := getClusterUUID(k8sClient)
if err != nil {
return nil, err
}
chExportProcess, err := clickhouseclient.NewClickHouseClient(chConfig, clusterUUID.String())
if err != nil {
return nil, err
Expand Down
22 changes: 7 additions & 15 deletions pkg/flowaggregator/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/exporter"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator"
Expand All @@ -50,27 +49,20 @@ type IPFIXExporter struct {
templateIDv6 uint16
registry ipfix.IPFIXRegistry
set ipfixentities.Set
k8sClient kubernetes.Interface
clusterUUID uuid.UUID
}

// genObservationDomainID generates an IPFIX Observation Domain ID when one is not provided by the
// user through the flow aggregator configuration. It will first try to generate one
// deterministically based on the cluster UUID (if available, with a timeout of 10s). Otherwise, it
// will generate a random one.
func genObservationDomainID(k8sClient kubernetes.Interface) uint32 {
clusterUUID, err := getClusterUUID(k8sClient)
if err != nil {
klog.ErrorS(err, "Error when retrieving cluster UUID; will generate a random observation domain ID")
clusterUUID = uuid.New()
}
// user through the flow aggregator configuration. It is generated as a hash of the cluster UUID.
func genObservationDomainID(clusterUUID uuid.UUID) uint32 {
h := fnv.New32()
h.Write(clusterUUID[:])
observationDomainID := h.Sum32()
return observationDomainID
}

func NewIPFIXExporter(
k8sClient kubernetes.Interface,
clusterUUID uuid.UUID,
opt *options.Options,
registry ipfix.IPFIXRegistry,
) *IPFIXExporter {
Expand All @@ -85,7 +77,7 @@ func NewIPFIXExporter(
if opt.Config.FlowCollector.ObservationDomainID != nil {
observationDomainID = *opt.Config.FlowCollector.ObservationDomainID
} else {
observationDomainID = genObservationDomainID(k8sClient)
observationDomainID = genObservationDomainID(clusterUUID)
}
klog.InfoS("Flow aggregator Observation Domain ID", "domainID", observationDomainID)

Expand All @@ -97,7 +89,7 @@ func NewIPFIXExporter(
observationDomainID: observationDomainID,
registry: registry,
set: ipfixentities.NewSet(false),
k8sClient: k8sClient,
clusterUUID: clusterUUID,
}

return exporter
Expand Down Expand Up @@ -144,7 +136,7 @@ func (e *IPFIXExporter) UpdateOptions(opt *options.Options) {
if opt.Config.FlowCollector.ObservationDomainID != nil {
e.observationDomainID = *opt.Config.FlowCollector.ObservationDomainID
} else {
e.observationDomainID = genObservationDomainID(e.k8sClient)
e.observationDomainID = genObservationDomainID(e.clusterUUID)
}
klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID)

Expand Down
46 changes: 32 additions & 14 deletions pkg/flowaggregator/exporter/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"net"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
"go.uber.org/mock/gomock"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/ptr"

flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator"
Expand Down Expand Up @@ -295,8 +295,9 @@ func createElementList(isIPv6 bool, mockIPFIXRegistry *ipfixtesting.MockIPFIXReg
}

func TestInitExportingProcess(t *testing.T) {
clusterUUID := uuid.New()

t.Run("tcp success", func(t *testing.T) {
k8sClientset := fake.NewSimpleClientset()
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
Expand All @@ -308,16 +309,13 @@ func TestInitExportingProcess(t *testing.T) {
opt.ExternalFlowCollectorAddr = listener.Addr().String()
opt.ExternalFlowCollectorProto = listener.Addr().Network()
opt.Config.FlowCollector.RecordFormat = "JSON"
obsDomainID := uint32(1)
opt.Config.FlowCollector.ObservationDomainID = &obsDomainID
createElementList(false, mockIPFIXRegistry)
createElementList(true, mockIPFIXRegistry)
exp := NewIPFIXExporter(k8sClientset, opt, mockIPFIXRegistry)
exp := NewIPFIXExporter(clusterUUID, opt, mockIPFIXRegistry)
err = exp.initExportingProcess()
assert.NoError(t, err)
})
t.Run("udp success", func(t *testing.T) {
k8sClientset := fake.NewSimpleClientset()
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
Expand All @@ -331,16 +329,13 @@ func TestInitExportingProcess(t *testing.T) {
opt.ExternalFlowCollectorAddr = listener.LocalAddr().String()
opt.ExternalFlowCollectorProto = listener.LocalAddr().Network()
opt.Config.FlowCollector.RecordFormat = "JSON"
obsDomainID := uint32(1)
opt.Config.FlowCollector.ObservationDomainID = &obsDomainID
createElementList(false, mockIPFIXRegistry)
createElementList(true, mockIPFIXRegistry)
exp := NewIPFIXExporter(k8sClientset, opt, mockIPFIXRegistry)
exp := NewIPFIXExporter(clusterUUID, opt, mockIPFIXRegistry)
err = exp.initExportingProcess()
assert.NoError(t, err)
})
t.Run("tcp failure", func(t *testing.T) {
k8sClientset := fake.NewSimpleClientset()
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
Expand All @@ -349,11 +344,34 @@ func TestInitExportingProcess(t *testing.T) {
// dialing this address is guaranteed to fail (we use 0 as the port number)
opt.ExternalFlowCollectorAddr = "127.0.0.1:0"
opt.ExternalFlowCollectorProto = "tcp"
// the observation domain should be set, or the test will take 10s to run
obsDomainID := uint32(1)
opt.Config.FlowCollector.ObservationDomainID = &obsDomainID
exp := NewIPFIXExporter(k8sClientset, opt, mockIPFIXRegistry)
exp := NewIPFIXExporter(clusterUUID, opt, mockIPFIXRegistry)
err := exp.initExportingProcess()
assert.ErrorContains(t, err, "got error when initializing IPFIX exporting process: dial tcp 127.0.0.1:0:")
})
}

func TestNewIPFIXExporterObservationDomainID(t *testing.T) {
clusterUUID := uuid.New()
testCases := []struct {
name string
userObservationDomainID *uint32
expectedObservationDomainID uint32
}{
{"user-provided", ptr.To[uint32](testObservationDomainID), testObservationDomainID},
{"generated from clusterUUID", nil, genObservationDomainID(clusterUUID)},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{}
flowaggregatorconfig.SetConfigDefaults(opt.Config)
opt.Config.FlowCollector.ObservationDomainID = tc.userObservationDomainID
exp := NewIPFIXExporter(clusterUUID, opt, mockIPFIXRegistry)
assert.Equal(t, clusterUUID, exp.clusterUUID)
assert.Equal(t, tc.expectedObservationDomainID, exp.observationDomainID)
})
}
}
8 changes: 2 additions & 6 deletions pkg/flowaggregator/exporter/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package exporter

import (
"github.com/google/uuid"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/flowaggregator/options"
Expand All @@ -35,13 +35,9 @@ func buildS3Input(opt *options.Options) s3uploader.S3Input {
}
}

func NewS3Exporter(k8sClient kubernetes.Interface, opt *options.Options) (*S3Exporter, error) {
func NewS3Exporter(clusterUUID uuid.UUID, opt *options.Options) (*S3Exporter, error) {
s3Input := buildS3Input(opt)
klog.InfoS("S3Uploader configuration", "bucketName", s3Input.Config.BucketName, "bucketPrefix", s3Input.Config.BucketPrefix, "region", s3Input.Config.Region, "recordFormat", s3Input.Config.RecordFormat, "compress", *s3Input.Config.Compress, "maxRecordsPerFile", s3Input.Config.MaxRecordsPerFile, "uploadInterval", s3Input.UploadInterval)
clusterUUID, err := getClusterUUID(k8sClient)
if err != nil {
return nil, err
}
s3UploadProcess, err := s3uploader.NewS3UploadProcess(s3Input, clusterUUID.String())
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit ccf2dd0

Please sign in to comment.