Skip to content

Commit

Permalink
Addressed comments from 7/30
Browse files Browse the repository at this point in the history
  • Loading branch information
srikartati committed Aug 11, 2020
1 parent b44fd78 commit 354295f
Show file tree
Hide file tree
Showing 23 changed files with 542 additions and 547 deletions.
17 changes: 9 additions & 8 deletions build/yamls/base/conf/antrea-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ featureGates:

# Provide flow collector address as string with format <IP>:<port>[:<proto>], where proto is tcp or udp. This also enables flow exporter that sends IPFIX
# flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, we consider tcp as default.
# Defaults to "".
#flowCollectorAddr: ""

# Provide flow exporter poll and export intervals in format "0s:0s". This determines how often flow exporter polls connections
# in conntrack module and exports IPFIX flow records that are built from connection store.
# Flow export interval should be a multiple of flow poll interval.
# Flow poll interval value should be in range [1s, ExportInterval(s)).
# Flow export interval value should be in range (PollInterval(s), 600s].
# Defaults to "5s:60s". Follow the time units of duration.
#flowPollAndFlowExportIntervals: ""
# Provide flow poll interval in format "0s". This determines how often flow exporter dumps connections in conntrack module.
# Flow poll interval should be greater than or equal to 1s(one second).
# Follow the time units of time.Duration type.
#flowPollInterval: "5s"

# Provide flow export frequency, which is the number of poll cycles elapsed before flow exporter exports flow records to
# the flow collector.
# Flow export frequency should be greater than or equal to 1.
#flowExportFrequency: 12
40 changes: 20 additions & 20 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,28 +239,28 @@ func run(o *Options) error {
go ofClient.StartPacketInHandler(stopCh)
}

// Initialize flow exporter; start go routines to poll conntrack flows and export IPFIX flow records
// Initialize flow exporter to start go routines to poll conntrack flows and export IPFIX flow records
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
if o.flowCollector != nil {
var connTrackDumper connections.ConnTrackDumper
if o.config.OVSDatapathType == ovsconfig.OVSDatapathSystem {
connTrackDumper = connections.NewConnTrackDumper(connections.NewConnTrackSystem(), nodeConfig, serviceCIDRNet, o.config.OVSDatapathType, agentQuerier.GetOVSCtlClient())
} else if o.config.OVSDatapathType == ovsconfig.OVSDatapathNetdev {
connTrackDumper = connections.NewConnTrackDumper(connections.NewConnTrackNetdev(), nodeConfig, serviceCIDRNet, o.config.OVSDatapathType, agentQuerier.GetOVSCtlClient())
}
connStore := connections.NewConnectionStore(connTrackDumper, ifaceStore, o.pollingInterval)
flowRecords := flowrecords.NewFlowRecords(connStore)
flowExporter, err := exporter.InitFlowExporter(o.flowCollector, flowRecords, o.exportInterval, o.pollingInterval)
if err != nil {
// If flow exporter cannot be initialized, then Antrea agent does not exit; only error is logged.
klog.Errorf("error when initializing flow exporter: %v", err)
} else {
// pollDone helps in synchronizing connStore.Run and flowExporter.Run go routines.
pollDone := make(chan bool)
go connStore.Run(stopCh, pollDone)
go flowExporter.Run(stopCh, pollDone)
}
connStore := connections.NewConnectionStore(
o.config.OVSDatapathType,
nodeConfig,
serviceCIDRNet,
agentQuerier.GetOVSCtlClient(),
ifaceStore,
o.pollInterval)
// pollDone helps in synchronizing connStore.Run and flowExporter.Run go routines.
pollDone := make(chan struct{})
go connStore.Run(stopCh, pollDone)

flowExporter, err := exporter.InitFlowExporter(
o.flowCollector,
flowrecords.NewFlowRecords(connStore),
o.config.FlowExportFrequency,
o.pollInterval)
if err != nil {
return fmt.Errorf("error when initializing flow exporter: %v", err)
}
go flowExporter.Run(stopCh, pollDone)
}

<-stopCh
Expand Down
21 changes: 12 additions & 9 deletions cmd/antrea-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,18 @@ type AgentConfig struct {
// Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener
// Defaults to false.
EnablePrometheusMetrics bool `yaml:"enablePrometheusMetrics,omitempty"`
// Provide flow collector address as string with format <IP>:<port>[:<proto>], where proto is tcp or udp. This also enables flow exporter that sends IPFIX
// flow records of conntrack flows on OVS bridge. If no L4 transport proto is given, we consider tcp as default.
// Provide the flow collector address as string with format <IP>:<port>[:<proto>], where proto is tcp or udp. This also
// enables the flow exporter that sends IPFIX flow records of conntrack flows on OVS bridge. If no L4 transport proto
// is given, we consider tcp as default.
// Defaults to "".
FlowCollectorAddr string `yaml:"flowCollectorAddr,omitempty"`
// Provide flow exporter poll and export intervals in format "0s:0s". This determines how often flow exporter polls connections
// in conntrack module and exports IPFIX flow records that are built from connection store.
// Flow export interval should be a multiple of flow poll interval.
// Flow poll interval value should be in range [1s, ExportInterval(s)).
// Flow export interval value should be in range (PollInterval(s), 600s].
// Defaults to "5s:60s". Follow the time units of duration.
FlowPollAndFlowExportIntervals string `yaml:"flowPollAndFlowExportIntervals,omitempty"`
// Provide flow poll interval in format "0s". This determines how often flow exporter dumps connections in conntrack module.
// Flow poll interval should be greater than or equal to 1s(one second).
// Defaults to "5s". Follow the time units of duration.
FlowPollInterval string `yaml:"flowPollInterval,omitempty"`
// Provide flow export frequency, which is the number of poll cycles elapsed before flow exporter exports flow records to
// the flow collector.
// Flow export frequency should be greater than or equal to 1.
// Defaults to "12".
FlowExportFrequency uint `yaml:"flowExportFrequency,omitempty"`
}
2 changes: 1 addition & 1 deletion cmd/antrea-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func newAgentCommand() *cobra.Command {
if err := opts.validate(args); err != nil {
klog.Fatalf("Failed to validate: %v", err)
}
// Not passing args again as it is already validated and not used in flow exporter config
// Not passing args again as they are already validated and are not used in flow exporter config
if err := opts.validateFlowExporterConfig(); err != nil {
klog.Fatalf("Failed to validate flow exporter config: %v", err)
}
Expand Down
58 changes: 25 additions & 33 deletions cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ type Options struct {
config *AgentConfig
// IPFIX flow collector
flowCollector net.Addr
// Flow exporter polling interval
pollingInterval time.Duration
// Flow exporter export interval
exportInterval time.Duration
// Flow exporter poll interval
pollInterval time.Duration
}

func newOptions() *Options {
Expand Down Expand Up @@ -152,15 +150,22 @@ func (o *Options) setDefaults() {
o.config.APIPort = apis.AntreaAgentAPIPort
}

if o.config.FlowCollectorAddr != "" && o.config.FlowPollAndFlowExportIntervals == "" {
o.pollingInterval = 5 * time.Second
o.exportInterval = 60 * time.Second
if o.config.FeatureGates[string(features.FlowExporter)] {
if o.config.FlowPollInterval == "" {
o.pollInterval = 5 * time.Second
}
if o.config.FlowExportFrequency == 0 {
// This frequency value makes flow export interval as 60s
o.config.FlowExportFrequency = 12
}
}
}

func (o *Options) validateFlowExporterConfig() error {
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
if o.config.FlowCollectorAddr != "" {
if o.config.FlowCollectorAddr == "" {
return fmt.Errorf("IPFIX flow collector address should be provided")
} else {
// Check if it is TCP or UDP
strSlice := strings.Split(o.config.FlowCollectorAddr, ":")
var proto string
Expand All @@ -175,6 +180,7 @@ func (o *Options) validateFlowExporterConfig() error {
} else {
return fmt.Errorf("IPFIX flow collector is given in invalid format")
}

// Convert the string input in net.Addr format
hostPortAddr := strSlice[0] + ":" + strSlice[1]
_, _, err := net.SplitHostPort(hostPortAddr)
Expand All @@ -184,37 +190,23 @@ func (o *Options) validateFlowExporterConfig() error {
if proto == "udp" {
o.flowCollector, err = net.ResolveUDPAddr("udp", hostPortAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector over UDP proto is not resolved: %v", err)
return fmt.Errorf("IPFIX flow collector over UDP proto cannot be resolved: %v", err)
}
} else {
o.flowCollector, err = net.ResolveTCPAddr("tcp", hostPortAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector over TCP proto is not resolved: %v", err)
return fmt.Errorf("IPFIX flow collector over TCP proto cannot be resolved: %v", err)
}
}

if o.config.FlowPollAndFlowExportIntervals != "" {
intervalSlice := strings.Split(o.config.FlowPollAndFlowExportIntervals, ":")
if len(intervalSlice) != 2 {
return fmt.Errorf("flow exporter intervals %s is not in acceptable format \"OOs:OOs\"", o.config.FlowPollAndFlowExportIntervals)
}
o.pollingInterval, err = time.ParseDuration(intervalSlice[0])
if err != nil {
return fmt.Errorf("poll interval is not provided in right format: %v", err)
}
o.exportInterval, err = time.ParseDuration(intervalSlice[1])
if err != nil {
return fmt.Errorf("export interval is not provided in right format: %v", err)
}
if o.pollingInterval < time.Second {
return fmt.Errorf("poll interval should be minimum of one second")
}
if o.pollingInterval > o.exportInterval {
return fmt.Errorf("poll interval should be less than or equal to export interval")
}
if o.exportInterval%o.pollingInterval != 0 {
return fmt.Errorf("export interval should be a multiple of poll interval")
}
}
if o.config.FlowPollInterval != "" {
var err error
o.pollInterval, err = time.ParseDuration(o.config.FlowPollInterval)
if err != nil {
return fmt.Errorf("FlowPollInterval is not provided in right format: %v", err)
}
if o.pollInterval < time.Second {
return fmt.Errorf("FlowPollInterval should be greater than or equal to one second")
}
}
}
Expand Down
53 changes: 53 additions & 0 deletions cmd/antrea-agent/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/vmware-tanzu/antrea/pkg/features"
)

func TestOptions_validateFlowExporterConfig(t *testing.T) {
// Enable flow exporter
enableFlowExporter := map[string]bool{
"FlowExporter": true,
}
features.DefaultMutableFeatureGate.SetFromMap(enableFlowExporter)
testcases := []struct {
// input
collector string
pollInterval string
// expectations
expCollectorNet string
expCollectorStr string
expPollIntervalStr string
expError error
}{
{collector: "192.168.1.100:2002:tcp", pollInterval: "5s", expCollectorNet: "tcp", expCollectorStr: "192.168.1.100:2002", expPollIntervalStr: "5s", expError: nil},
{collector: "192.168.1.100:2002:udp", pollInterval: "5s", expCollectorNet: "udp", expCollectorStr: "192.168.1.100:2002", expPollIntervalStr: "5s", expError: nil},
{collector: "192.168.1.100:2002", pollInterval: "5s", expCollectorNet: "tcp", expCollectorStr: "192.168.1.100:2002", expPollIntervalStr: "5s", expError: nil},
{collector: "192.168.1.100:2002:sctp", pollInterval: "5s", expCollectorNet: "", expCollectorStr: "", expPollIntervalStr: "", expError: fmt.Errorf("IPFIX flow collector over %s proto is not supported", "sctp")},
{collector: "192.168.1.100:2002", pollInterval: "5ss", expCollectorNet: "tcp", expCollectorStr: "192.168.1.100:2002", expPollIntervalStr: "", expError: fmt.Errorf("FlowPollInterval is not provided in right format: ")},
{collector: "192.168.1.100:2002", pollInterval: "1ms", expCollectorNet: "tcp", expCollectorStr: "192.168.1.100:2002", expPollIntervalStr: "", expError: fmt.Errorf("FlowPollInterval should be greater than or equal to one second")},
}
assert.Equal(t, features.DefaultFeatureGate.Enabled(features.FlowExporter), true)
for _, tc := range testcases {
testOptions := &Options{
config: new(AgentConfig),
}
testOptions.config.FlowCollectorAddr = tc.collector
testOptions.config.FlowPollInterval = tc.pollInterval
err := testOptions.validateFlowExporterConfig()

if tc.expError != nil {
assert.NotNil(t, err)
} else {
assert.Equal(t, tc.expCollectorNet, testOptions.flowCollector.Network())
assert.Equal(t, tc.expCollectorStr, testOptions.flowCollector.String())
assert.Equal(t, tc.expPollIntervalStr, testOptions.pollInterval.String())
}
}

}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ require (
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/grpc v1.26.0
gopkg.in/yaml.v2 v2.2.8
gotest.tools v2.2.0+incompatible
k8s.io/api v0.18.4
k8s.io/apimachinery v0.18.4
k8s.io/apiserver v0.18.4
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
2 changes: 1 addition & 1 deletion hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ MOCKGEN_TARGETS=(
"pkg/agent/querier AgentQuerier"
"pkg/controller/querier ControllerQuerier"
"pkg/querier AgentNetworkPolicyInfoQuerier"
"pkg/agent/flowexporter/connections ConnTrackDumper,ConnTrackInterfacer"
"pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack"
"pkg/agent/flowexporter/ipfix IPFIXExportingProcess,IPFIXRecord"
)

Expand Down
Loading

0 comments on commit 354295f

Please sign in to comment.