Skip to content

Commit

Permalink
Add new exporter to FlowAggregator to write flows to local file
Browse files Browse the repository at this point in the history
Flows are written to a local "log" file, in CSV format, with support for
log rotation. Not all the fields from the flow records are
included. Configuration options include the ability to filter flows
based on ingress / egress network policy rule actions (in the future,
additional filtering capabilities could be introduced).

For antrea-io#3794

Signed-off-by: Antonin Bas <abas@vmware.com>
  • Loading branch information
antoninbas committed Apr 13, 2023
1 parent b3e85fe commit b629b1d
Show file tree
Hide file tree
Showing 16 changed files with 961 additions and 24 deletions.
35 changes: 35 additions & 0 deletions build/charts/flow-aggregator/conf/flow-aggregator.conf
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,38 @@ s3Uploader:

# UploadInterval is the duration between each file upload to S3.
uploadInterval: {{ .Values.s3Uploader.uploadInterval | quote }}

# FlowLogger contains configuration options for writing flow records to a local log file.
flowLogger:
# Enable is the switch to enable writing flow records to a local log file.
enable: {{ .Values.flowLogger.enable }}

# Path is the path to the local log file.
path: {{ .Values.flowLogger.path | quote }}

# MaxSize is the maximum size in MB of a log file before it gets rotated.
maxSize: {{ .Values.flowLogger.maxSize }}

# MaxBackups is the maximum number of old log files to retain. If set to 0, all log files will be
# retained (unless MaxAge causes them to be deleted).
maxBackups: {{ .Values.flowLogger.maxBackups }}

# MaxAge is the maximum number of days to retain old log files based on the timestamp encoded in
# their filename. The default (0) is not to remove old log files based on age.
maxAge: {{ .Values.flowLogger.maxAge }}

# Compress enables gzip compression on rotated files.
compress: {{ .Values.flowLogger.compress }}

# RecordFormat defines the format of the flow records uploaded to S3. Only "CSV" is supported at
# the moment.
recordFormat: {{ .Values.flowLogger.recordFormat | quote }}

# Filters can be used to select which flow records to log to file. The provided filters are OR-ed
# to determine whether a specific flow should be logged.
filters:
{{- toYaml .Values.flowLogger.filters | trim | nindent 6 }}

# PrettyPrint enables conversion of some numeric fields to a more meaningful string
# representation.
prettyPrint: {{ .Values.flowLogger.prettyPrint }}
25 changes: 25 additions & 0 deletions build/charts/flow-aggregator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,31 @@ s3Uploader:
aws_access_key_id: "changeme"
aws_secret_access_key: "changeme"
aws_session_token: ""
# flowLogger contains configuration options for writing flow records to a local log file.
flowLogger:
# -- Determine whether to enable exporting flow records to a local log file.
enable: false
# -- Path is the path to the local log file.
path: "/tmp/antrea-flows.log"
# -- MaxSize is the maximum size in MB of a log file before it gets rotated.
maxSize: 100
# -- MaxBackups is the maximum number of old log files to retain. If set to 0, all log files will
# be retained (unless MaxAge causes them to be deleted).
maxBackups: 3
# -- MaxAge is the maximum number of days to retain old log files based on the timestamp encoded
# in their filename. The default (0) is not to remove old log files based on age.
maxAge: 0
# -- Compress enables gzip compression on rotated files.
compress: true
# -- RecordFormat defines the format of the flow records uploaded to S3. Only "CSV" is supported at the moment.
recordFormat: "CSV"
# -- Filters can be used to select which flow records to log to file. The provided filters are
# OR-ed to determine whether a specific flow should be logged. By default, all flows are logged.
filters: []
# With the following filters, only flow which are denied because of a network policy will be logged.
# filters: [{ingressNetworkPolicyRuleActions: ["Drop", "Reject"]}, {egressNetworkPolicyRuleActions: ["Drop", "Reject"]}]
# -- PrettyPrint enables conversion of some numeric fields to a more meaningful string representation.
prettyPrint: true
testing:
## -- Enable code coverage measurement (used when testing Flow Aggregator only).
coverage: false
Expand Down
35 changes: 35 additions & 0 deletions build/yamls/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,41 @@ data:
# UploadInterval is the duration between each file upload to S3.
uploadInterval: "60s"
# FlowLogger contains configuration options for writing flow records to a local log file.
flowLogger:
# Enable is the switch to enable writing flow records to a local log file.
enable: false
# Path is the path to the local log file.
path: "/tmp/antrea-flows.log"
# MaxSize is the maximum size in MB of a log file before it gets rotated.
maxSize: 100
# MaxBackups is the maximum number of old log files to retain. If set to 0, all log files will be
# retained (unless MaxAge causes them to be deleted).
maxBackups: 3
# MaxAge is the maximum number of days to retain old log files based on the timestamp encoded in
# their filename. The default (0) is not to remove old log files based on age.
maxAge: 0
# Compress enables gzip compression on rotated files.
compress: true
# RecordFormat defines the format of the flow records uploaded to S3. Only "CSV" is supported at
# the moment.
recordFormat: "CSV"
# Filters can be used to select which flow records to log to file. The provided filters are OR-ed
# to determine whether a specific flow should be logged.
filters:
[]
# PrettyPrint enables conversion of some numeric fields to a more meaningful string
# representation.
prettyPrint: true
kind: ConfigMap
metadata:
labels:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
github.com/vishvananda/netlink v1.1.1-0.20211101163509-b10eb8fe5cf6
github.com/vmware/go-ipfix v0.6.0
golang.org/x/crypto v0.8.0
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/mod v0.10.0
golang.org/x/net v0.9.0
golang.org/x/sync v0.1.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1262,8 +1262,9 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
59 changes: 54 additions & 5 deletions pkg/config/flowaggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ type FlowAggregatorConfig struct {
AggregatorTransportProtocol AggregatorTransportProtocol `yaml:"aggregatorTransportProtocol,omitempty"`
// Provide an extra DNS name or IP address of flow aggregator for generating TLS certificate.
FlowAggregatorAddress string `yaml:"flowAggregatorAddress,omitempty"`
// recordContents enables configuring some fields in the flow records. Fields can be
// RecordContents enables configuring some fields in the flow records. Fields can be
// excluded to reduce record size.
RecordContents RecordContentsConfig `yaml:"recordContents,omitempty"`
// apiServer contains APIServer related configuration options.
// APIServer contains APIServer related configuration options.
APIServer APIServerConfig `yaml:"apiServer,omitempty"`
// flowCollector contains external IPFIX or JSON collector related configuration options.
// FlowCollector contains external IPFIX or JSON collector related configuration options.
FlowCollector FlowCollectorConfig `yaml:"flowCollector,omitempty"`
// clickHouse contains ClickHouse related configuration options.
// ClickHouse contains ClickHouse related configuration options.
ClickHouse ClickHouseConfig `yaml:"clickHouse,omitempty"`
// s3Uploader contains configuration options for uploading flow records to AWS S3.
// S3Uploader contains configuration options for uploading flow records to AWS S3.
S3Uploader S3UploaderConfig `yaml:"s3Uploader,omitempty"`
// FlowLogger contains configuration options for writing flow records to a local log file.
FlowLogger FlowLoggerConfig `yaml:"flowLogger,omitempty"`
}

type RecordContentsConfig struct {
Expand Down Expand Up @@ -135,3 +137,50 @@ type S3UploaderConfig struct {
// UploadInterval is the duration between each file upload to S3.
UploadInterval string `yaml:"uploadInterval,omitempty"`
}

type FlowLoggerConfig struct {
// Enable is the switch to enable writing flow records to a local log file.
Enable bool `yaml:"enable,omitempty"`
// Path is the path to the local log file. Defaults to the antrea-flows.log file in the
// operating system's default directory for temporary files (provided by os.TempDir).
Path string `yaml:"path,omitempty"`
// MaxSize is the maximum size in MB of a log file before it gets rotated. Defaults to 100MB.
MaxSize int32 `yaml:"maxSize,omitempty"`
// MaxBackups is the maximum number of old log files to retain. If set to 0, all log files
// will be retained (unless MaxAge causes them to be deleted). Defaults to 3.
MaxBackups int32 `yaml:"maxBackups,omitempty"`
// MaxAge is the maximum number of days to retain old log files based on the timestamp
// encoded in their filename. The default (0) is not to remove old log files based on age.
MaxAge int32 `yaml:"maxAge,omitempty"`
// Compress enables gzip compression on rotated files. Defaults to true.
Compress *bool `yaml:"compress,omitempty"`
// RecordFormat defines the format of the flow records uploaded to S3. Only "CSV" is
// supported at the moment.
RecordFormat string `yaml:"recordFormat,omitempty"`
// Filters can be used to select which flow records to log to file. The provided filters are
// OR-ed to determine whether a specific flow should be logged. By default, all flows are
// logged.
Filters []FlowFilter `yaml:"filters,omitempty"`
// PrettyPrint enables conversion of some numeric fields to a more meaningful string
// representation.
PrettyPrint *bool `yaml:"prettyPrint,omitempty"`
}

type NetworkPolicyRuleAction string

const (
NetworkPolicyRuleActionNone NetworkPolicyRuleAction = "None"
NetworkPolicyRuleActionAllow NetworkPolicyRuleAction = "Allow"
NetworkPolicyRuleActionDrop NetworkPolicyRuleAction = "Drop"
NetworkPolicyRuleActionReject NetworkPolicyRuleAction = "Reject"
)

// FlowFilter will match a flow if all individual conditions are fulfilled.
type FlowFilter struct {
// IngressNetworkPolicyRuleActions supports filtering based on the action name for the
// ingress policy rule applied to the flow. By default, all actions are considered.
IngressNetworkPolicyRuleActions []NetworkPolicyRuleAction `yaml:"ingressNetworkPolicyRuleActions,omitempty"`
// EgressNetworkPolicyRuleActions supports filtering based on the action name for the egress
// policy rule applied to the flow. By default, all actions are considered.
EgressNetworkPolicyRuleActions []NetworkPolicyRuleAction `yaml:"egressNetworkPolicyRuleActions,omitempty"`
}
46 changes: 37 additions & 9 deletions pkg/config/flowaggregator/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package flowaggregator

import (
"os"
"path/filepath"
"time"

"antrea.io/antrea/pkg/apis"
Expand All @@ -27,15 +29,21 @@ const (
DefaultInactiveFlowRecordTimeout = "90s"
DefaultAggregatorTransportProtocol = "TLS"
DefaultRecordFormat = "IPFIX"
DefaultClickHouseDatabase = "default"
DefaultClickHouseCommitInterval = "8s"
MinClickHouseCommitInterval = 1 * time.Second
DefaultClickHouseDatabaseUrl = "tcp://clickhouse-clickhouse.flow-visibility.svc:9000"
DefaultS3Region = "us-west-2"
DefaultS3RecordFormat = "CSV"
DefaultS3MaxRecordsPerFile = 1000000
DefaultS3UploadInterval = "60s"
MinS3CommitInterval = 1 * time.Second

DefaultClickHouseDatabase = "default"
DefaultClickHouseCommitInterval = "8s"
MinClickHouseCommitInterval = 1 * time.Second
DefaultClickHouseDatabaseUrl = "tcp://clickhouse-clickhouse.flow-visibility.svc:9000"

DefaultS3Region = "us-west-2"
DefaultS3RecordFormat = "CSV"
DefaultS3MaxRecordsPerFile = 1000000
DefaultS3UploadInterval = "60s"
MinS3CommitInterval = 1 * time.Second

DefaultLoggerMaxSize = 100
DefaultLoggerMaxBackups = 3
DefaultLoggerRecordFormat = "CSV"
)

func SetConfigDefaults(flowAggregatorConf *FlowAggregatorConfig) {
Expand Down Expand Up @@ -80,4 +88,24 @@ func SetConfigDefaults(flowAggregatorConf *FlowAggregatorConfig) {
if flowAggregatorConf.S3Uploader.UploadInterval == "" {
flowAggregatorConf.S3Uploader.UploadInterval = DefaultS3UploadInterval
}
if flowAggregatorConf.FlowLogger.Path == "" {
flowAggregatorConf.FlowLogger.Path = filepath.Join(os.TempDir(), "antrea-flows.log")
}
if flowAggregatorConf.FlowLogger.MaxSize == 0 {
flowAggregatorConf.FlowLogger.MaxSize = DefaultLoggerMaxSize
}
if flowAggregatorConf.FlowLogger.MaxBackups == 0 {
flowAggregatorConf.FlowLogger.MaxBackups = DefaultLoggerMaxBackups
}
if flowAggregatorConf.FlowLogger.Compress == nil {
flowAggregatorConf.FlowLogger.Compress = new(bool)
*flowAggregatorConf.FlowLogger.Compress = true
}
if flowAggregatorConf.FlowLogger.RecordFormat == "" {
flowAggregatorConf.FlowLogger.RecordFormat = DefaultLoggerRecordFormat
}
if flowAggregatorConf.FlowLogger.PrettyPrint == nil {
flowAggregatorConf.FlowLogger.PrettyPrint = new(bool)
*flowAggregatorConf.FlowLogger.PrettyPrint = true
}
}
4 changes: 4 additions & 0 deletions pkg/flowaggregator/exporter/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,12 @@ func TestInitExportingProcess(t *testing.T) {
opt := &options.Options{}
opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{}
flowaggregatorconfig.SetConfigDefaults(opt.Config)
// dialing this address is guaranteed to fail (we use 0 as the port number)
opt.ExternalFlowCollectorAddr = "127.0.0.1:0"
opt.ExternalFlowCollectorProto = "tcp"
// the observation domain should be set, or the test will take 10s to run
obsDomainID := uint32(1)
opt.Config.FlowCollector.ObservationDomainID = &obsDomainID
exp := NewIPFIXExporter(k8sClientset, opt, mockIPFIXRegistry)
err := exp.initExportingProcess()
assert.ErrorContains(t, err, "got error when initializing IPFIX exporting process: dial tcp 127.0.0.1:0:")
Expand Down
Loading

0 comments on commit b629b1d

Please sign in to comment.