Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always require the cluster UUID in the FlowAggregator #6714

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/flow-aggregator/flow-aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,16 @@ func run(configFile string) error {
podInformer := informerFactory.Core().V1().Pods()
podStore := podstore.NewPodStore(podInformer.Informer())

klog.InfoS("Retrieving Antrea cluster UUID")
clusterUUID, err := aggregator.GetClusterUUID(ctx, k8sClient)
if err != nil {
return err
}
klog.InfoS("Retrieved Antrea cluster UUID", "clusterUUID", clusterUUID)

flowAggregator, err := aggregator.NewFlowAggregator(
k8sClient,
clusterUUID,
podStore,
configFile,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Antrea Authors
// Copyright 2024 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package exporter
package flowaggregator

import (
"context"
Expand All @@ -26,31 +26,34 @@ import (
"antrea.io/antrea/pkg/clusteridentity"
)

// getClusterUUID retrieves the cluster UUID (if available, with a timeout of 10s).
// GetClusterUUID retrieves the cluster UUID (if available, with a timeout of 10s).
// Otherwise, it returns an empty cluster UUID and error. The cluster UUID should
// be available if Antrea is deployed to the cluster ahead of the flow aggregator,
// which is the expectation since when deploying flow aggregator as a Pod,
// networking needs to be configured by the CNI plugin.
func getClusterUUID(k8sClient kubernetes.Interface) (uuid.UUID, error) {
func GetClusterUUID(ctx context.Context, k8sClient kubernetes.Interface) (uuid.UUID, error) {
const retryInterval = time.Second
const timeout = 10 * time.Second
const defaultAntreaNamespace = "kube-system"

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider(
defaultAntreaNamespace,
clusteridentity.DefaultClusterIdentityConfigMapName,
k8sClient,
)
var clusterUUID uuid.UUID
if err := wait.PollUntilContextTimeout(context.TODO(), retryInterval, timeout, true, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, retryInterval, true, func(ctx context.Context) (bool, error) {
clusterIdentity, _, err := clusterIdentityProvider.Get()
if err != nil {
return false, nil
}
clusterUUID = clusterIdentity.UUID
return true, nil
}); err != nil {
return clusterUUID, fmt.Errorf("unable to retrieve cluster UUID from ConfigMap '%s/%s': timeout after %v", defaultAntreaNamespace, clusteridentity.DefaultClusterIdentityConfigMapName, timeout)
return clusterUUID, fmt.Errorf("unable to retrieve cluster UUID from ConfigMap '%s/%s': %w", defaultAntreaNamespace, clusteridentity.DefaultClusterIdentityConfigMapName, err)
}
return clusterUUID, nil
}
44 changes: 44 additions & 0 deletions pkg/flowaggregator/cluster_uuid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flowaggregator

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"k8s.io/client-go/kubernetes/fake"

"antrea.io/antrea/pkg/clusteridentity"
)

func TestGetClusterUUID(t *testing.T) {
ctx := context.Background()
client := fake.NewSimpleClientset()
clusterIdentityAllocator := clusteridentity.NewClusterIdentityAllocator(
"kube-system",
clusteridentity.DefaultClusterIdentityConfigMapName,
client,
)
stopCh := make(chan struct{})
defer close(stopCh)
go clusterIdentityAllocator.Run(stopCh)

ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
_, err := GetClusterUUID(ctx, client)
require.NoError(t, err, "cluster UUID not available")
}
8 changes: 2 additions & 6 deletions pkg/flowaggregator/exporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"reflect"
"time"

"github.com/google/uuid"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/flowaggregator/clickhouseclient"
Expand Down Expand Up @@ -57,7 +57,7 @@ func buildClickHouseConfig(opt *options.Options) clickhouseclient.ClickHouseConf
}
}

func NewClickHouseExporter(k8sClient kubernetes.Interface, opt *options.Options) (*ClickHouseExporter, error) {
func NewClickHouseExporter(clusterUUID uuid.UUID, opt *options.Options) (*ClickHouseExporter, error) {
chConfig := buildClickHouseConfig(opt)
klog.InfoS("ClickHouse configuration", "database", chConfig.Database, "databaseURL", chConfig.DatabaseURL, "debug", chConfig.Debug,
"compress", *chConfig.Compress, "commitInterval", chConfig.CommitInterval, "insecureSkipVerify", chConfig.InsecureSkipVerify, "caCert", chConfig.CACert)
Expand All @@ -77,10 +77,6 @@ func NewClickHouseExporter(k8sClient kubernetes.Interface, opt *options.Options)
return nil, fmt.Errorf("error when reading custom CA certificate: %v", errMessage)
}
}
clusterUUID, err := getClusterUUID(k8sClient)
if err != nil {
return nil, err
}
chExportProcess, err := clickhouseclient.NewClickHouseClient(chConfig, clusterUUID.String())
if err != nil {
return nil, err
Expand Down
22 changes: 7 additions & 15 deletions pkg/flowaggregator/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/exporter"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator"
Expand All @@ -50,27 +49,20 @@ type IPFIXExporter struct {
templateIDv6 uint16
registry ipfix.IPFIXRegistry
set ipfixentities.Set
k8sClient kubernetes.Interface
clusterUUID uuid.UUID
}

// genObservationDomainID generates an IPFIX Observation Domain ID when one is not provided by the
// user through the flow aggregator configuration. It will first try to generate one
// deterministically based on the cluster UUID (if available, with a timeout of 10s). Otherwise, it
// will generate a random one.
func genObservationDomainID(k8sClient kubernetes.Interface) uint32 {
clusterUUID, err := getClusterUUID(k8sClient)
if err != nil {
klog.ErrorS(err, "Error when retrieving cluster UUID; will generate a random observation domain ID")
clusterUUID = uuid.New()
}
// user through the flow aggregator configuration. It is generated as a hash of the cluster UUID.
func genObservationDomainID(clusterUUID uuid.UUID) uint32 {
h := fnv.New32()
h.Write(clusterUUID[:])
observationDomainID := h.Sum32()
return observationDomainID
}

func NewIPFIXExporter(
k8sClient kubernetes.Interface,
clusterUUID uuid.UUID,
opt *options.Options,
registry ipfix.IPFIXRegistry,
) *IPFIXExporter {
Expand All @@ -85,7 +77,7 @@ func NewIPFIXExporter(
if opt.Config.FlowCollector.ObservationDomainID != nil {
observationDomainID = *opt.Config.FlowCollector.ObservationDomainID
} else {
observationDomainID = genObservationDomainID(k8sClient)
observationDomainID = genObservationDomainID(clusterUUID)
}
klog.InfoS("Flow aggregator Observation Domain ID", "domainID", observationDomainID)

Expand All @@ -97,7 +89,7 @@ func NewIPFIXExporter(
observationDomainID: observationDomainID,
registry: registry,
set: ipfixentities.NewSet(false),
k8sClient: k8sClient,
clusterUUID: clusterUUID,
}

return exporter
Expand Down Expand Up @@ -144,7 +136,7 @@ func (e *IPFIXExporter) UpdateOptions(opt *options.Options) {
if opt.Config.FlowCollector.ObservationDomainID != nil {
e.observationDomainID = *opt.Config.FlowCollector.ObservationDomainID
} else {
e.observationDomainID = genObservationDomainID(e.k8sClient)
e.observationDomainID = genObservationDomainID(e.clusterUUID)
}
klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID)

Expand Down
46 changes: 32 additions & 14 deletions pkg/flowaggregator/exporter/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"net"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
"go.uber.org/mock/gomock"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/ptr"

flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator"
Expand Down Expand Up @@ -295,8 +295,9 @@ func createElementList(isIPv6 bool, mockIPFIXRegistry *ipfixtesting.MockIPFIXReg
}

func TestInitExportingProcess(t *testing.T) {
clusterUUID := uuid.New()

t.Run("tcp success", func(t *testing.T) {
k8sClientset := fake.NewSimpleClientset()
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
Expand All @@ -308,16 +309,13 @@ func TestInitExportingProcess(t *testing.T) {
opt.ExternalFlowCollectorAddr = listener.Addr().String()
opt.ExternalFlowCollectorProto = listener.Addr().Network()
opt.Config.FlowCollector.RecordFormat = "JSON"
obsDomainID := uint32(1)
opt.Config.FlowCollector.ObservationDomainID = &obsDomainID
createElementList(false, mockIPFIXRegistry)
createElementList(true, mockIPFIXRegistry)
exp := NewIPFIXExporter(k8sClientset, opt, mockIPFIXRegistry)
exp := NewIPFIXExporter(clusterUUID, opt, mockIPFIXRegistry)
err = exp.initExportingProcess()
assert.NoError(t, err)
})
t.Run("udp success", func(t *testing.T) {
k8sClientset := fake.NewSimpleClientset()
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
Expand All @@ -331,16 +329,13 @@ func TestInitExportingProcess(t *testing.T) {
opt.ExternalFlowCollectorAddr = listener.LocalAddr().String()
opt.ExternalFlowCollectorProto = listener.LocalAddr().Network()
opt.Config.FlowCollector.RecordFormat = "JSON"
obsDomainID := uint32(1)
opt.Config.FlowCollector.ObservationDomainID = &obsDomainID
createElementList(false, mockIPFIXRegistry)
createElementList(true, mockIPFIXRegistry)
exp := NewIPFIXExporter(k8sClientset, opt, mockIPFIXRegistry)
exp := NewIPFIXExporter(clusterUUID, opt, mockIPFIXRegistry)
err = exp.initExportingProcess()
assert.NoError(t, err)
})
t.Run("tcp failure", func(t *testing.T) {
k8sClientset := fake.NewSimpleClientset()
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
Expand All @@ -349,11 +344,34 @@ func TestInitExportingProcess(t *testing.T) {
// dialing this address is guaranteed to fail (we use 0 as the port number)
opt.ExternalFlowCollectorAddr = "127.0.0.1:0"
opt.ExternalFlowCollectorProto = "tcp"
// the observation domain should be set, or the test will take 10s to run
obsDomainID := uint32(1)
opt.Config.FlowCollector.ObservationDomainID = &obsDomainID
exp := NewIPFIXExporter(k8sClientset, opt, mockIPFIXRegistry)
exp := NewIPFIXExporter(clusterUUID, opt, mockIPFIXRegistry)
err := exp.initExportingProcess()
assert.ErrorContains(t, err, "got error when initializing IPFIX exporting process: dial tcp 127.0.0.1:0:")
})
}

func TestNewIPFIXExporterObservationDomainID(t *testing.T) {
clusterUUID := uuid.New()
testCases := []struct {
name string
userObservationDomainID *uint32
expectedObservationDomainID uint32
}{
{"user-provided", ptr.To[uint32](testObservationDomainID), testObservationDomainID},
{"generated from clusterUUID", nil, genObservationDomainID(clusterUUID)},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{}
flowaggregatorconfig.SetConfigDefaults(opt.Config)
opt.Config.FlowCollector.ObservationDomainID = tc.userObservationDomainID
exp := NewIPFIXExporter(clusterUUID, opt, mockIPFIXRegistry)
assert.Equal(t, clusterUUID, exp.clusterUUID)
assert.Equal(t, tc.expectedObservationDomainID, exp.observationDomainID)
})
}
}
8 changes: 2 additions & 6 deletions pkg/flowaggregator/exporter/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package exporter

import (
"github.com/google/uuid"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/flowaggregator/options"
Expand All @@ -35,13 +35,9 @@ func buildS3Input(opt *options.Options) s3uploader.S3Input {
}
}

func NewS3Exporter(k8sClient kubernetes.Interface, opt *options.Options) (*S3Exporter, error) {
func NewS3Exporter(clusterUUID uuid.UUID, opt *options.Options) (*S3Exporter, error) {
s3Input := buildS3Input(opt)
klog.InfoS("S3Uploader configuration", "bucketName", s3Input.Config.BucketName, "bucketPrefix", s3Input.Config.BucketPrefix, "region", s3Input.Config.Region, "recordFormat", s3Input.Config.RecordFormat, "compress", *s3Input.Config.Compress, "maxRecordsPerFile", s3Input.Config.MaxRecordsPerFile, "uploadInterval", s3Input.UploadInterval)
clusterUUID, err := getClusterUUID(k8sClient)
if err != nil {
return nil, err
}
s3UploadProcess, err := s3uploader.NewS3UploadProcess(s3Input, clusterUUID.String())
if err != nil {
return nil, err
Expand Down
Loading