Skip to content

Commit

Permalink
Support IPFIX flow records for flow exporter feature:
Browse files Browse the repository at this point in the history
Still WIP as we are waiting to get IPFIX library available under
vmware github org.

Added support to export IPFIX flow records that are built from
connection map using IPFIX library.

Did testing with ipfix collector in local k8s cluster running iperf service
consisting tcp client and server.
Could not add unit tests because generating mocks without IPFIX library
in github is difficult. Will do the unit tests (exporter_test.go) once
IPFIX library is avaialble.
Plan to do run with elastiflow.

Compiling using local IPFIX lib files (please see go.mod).

Issue# 712
  • Loading branch information
srikartati committed Jun 15, 2020
1 parent c17da5e commit ac163fd
Show file tree
Hide file tree
Showing 18 changed files with 630 additions and 26 deletions.
10 changes: 7 additions & 3 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ data:
# Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener.
#enablePrometheusMetrics: false
# Provide flow collector address as string with format IP:port. This also enables flow exporter that sends IPFIX
# flow records of conntrack flows on OVS bridge.
#flowCollectorAddr: ""
antrea-cni.conflist: |
{
"cniVersion":"0.3.0",
Expand Down Expand Up @@ -412,7 +416,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-hmd2mdhg89
name: antrea-config-k56c79bkt5
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -517,7 +521,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-hmd2mdhg89
name: antrea-config-k56c79bkt5
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -729,7 +733,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-hmd2mdhg89
name: antrea-config-k56c79bkt5
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
10 changes: 7 additions & 3 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ data:
# Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener.
#enablePrometheusMetrics: false
# Provide flow collector address as string with format IP:port. This also enables flow exporter that sends IPFIX
# flow records of conntrack flows on OVS bridge.
#flowCollectorAddr: ""
antrea-cni.conflist: |
{
"cniVersion":"0.3.0",
Expand Down Expand Up @@ -412,7 +416,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-ff5ff2btgc
name: antrea-config-g5mgkb6tbt
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -517,7 +521,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-ff5ff2btgc
name: antrea-config-g5mgkb6tbt
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -727,7 +731,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-ff5ff2btgc
name: antrea-config-g5mgkb6tbt
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
10 changes: 7 additions & 3 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ data:
# Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener.
#enablePrometheusMetrics: false
# Provide flow collector address as string with format IP:port. This also enables flow exporter that sends IPFIX
# flow records of conntrack flows on OVS bridge.
#flowCollectorAddr: ""
antrea-cni.conflist: |
{
"cniVersion":"0.3.0",
Expand Down Expand Up @@ -412,7 +416,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-fggkd66d2h
name: antrea-config-tkbbc4fddk
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -526,7 +530,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-fggkd66d2h
name: antrea-config-tkbbc4fddk
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -771,7 +775,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-fggkd66d2h
name: antrea-config-tkbbc4fddk
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
10 changes: 7 additions & 3 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ data:
# Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener.
#enablePrometheusMetrics: false
# Provide flow collector address as string with format IP:port. This also enables flow exporter that sends IPFIX
# flow records of conntrack flows on OVS bridge.
#flowCollectorAddr: ""
antrea-cni.conflist: |
{
"cniVersion":"0.3.0",
Expand Down Expand Up @@ -412,7 +416,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-mf4t8c67c8
name: antrea-config-6g94k9db2b
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -517,7 +521,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-mf4t8c67c8
name: antrea-config-6g94k9db2b
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -727,7 +731,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-mf4t8c67c8
name: antrea-config-6g94k9db2b
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
4 changes: 4 additions & 0 deletions build/yamls/base/conf/antrea-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@

# Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener.
#enablePrometheusMetrics: false

# Provide flow collector address as string with format IP:port. This also enables flow exporter that sends IPFIX
# flow records of conntrack flows on OVS bridge.
#flowCollectorAddr: ""
17 changes: 14 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/controller/networkpolicy"
"github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/exporter"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/flowrecords"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
Expand Down Expand Up @@ -193,11 +195,20 @@ func run(o *Options) error {
}
go apiServer.Run(stopCh)

// Create connection store that polls conntrack flows with a given polling interval.
if o.config.EnableFlowExporter {
// Initialize flow exporter and start functions to poll conntrack flows and export IPFIX flow records
if o.flowCollector != nil {
connTrack := connections.NewConnTrackPoller(nodeConfig, connections.NewConnTrack())
connStore := connections.NewConnectionStore(connTrack, ifaceStore)
go connStore.Run(stopCh)
flowRecords := flowrecords.NewFlowRecords(connStore)
flowExporter, err := exporter.InitFlowExporter(o.flowCollector, flowRecords)
if err != nil {
// Antrea agent do not exit, if flow exporter cannot be initialized.
// Currently, only logging the error.
klog.Errorf("error when initializing flow exporter: %v", err)
} else {
go connStore.Run(stopCh)
go flowExporter.Run(stopCh)
}
}

<-stopCh
Expand Down
7 changes: 4 additions & 3 deletions cmd/antrea-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ type AgentConfig struct {
// Enable metrics exposure via Prometheus. Initializes Prometheus metrics listener
// Defaults to false.
EnablePrometheusMetrics bool `yaml:"enablePrometheusMetrics,omitempty"`
// Enable flow exporter that exports IPFIX flow records of conntrack flows on OVS bridge
// Defaults to false.
EnableFlowExporter bool `yaml:"enableFlowExporter,omitempty"`
// Provide flow collector address as string with format IP:port. This also enables flow exporter that sends IPFIX
// flow records of conntrack flows on OVS bridge.
// Defaults to "".
FlowCollectorAddr string `yaml:"flowCollectorAddr,omitempty"`
}
18 changes: 16 additions & 2 deletions cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Options struct {
configFile string
// The configuration object
config *AgentConfig
// IPFIX flow collector
flowCollector net.Addr
}

func newOptions() *Options {
Expand Down Expand Up @@ -101,8 +103,20 @@ func (o *Options) validate(args []string) error {
if encapMode.SupportsNoEncap() && o.config.EnableIPSecTunnel {
return fmt.Errorf("IPSec tunnel may only be enabled on %s mode", config.TrafficEncapModeEncap)
}
if o.config.OVSDatapathType == ovsconfig.OVSDatapathNetdev && o.config.EnableFlowExporter {
return fmt.Errorf("Flow exporter is not supported for OVS datapath type %s", o.config.OVSDatapathType)
if o.config.FlowCollectorAddr != "" {
if o.config.OVSDatapathType == ovsconfig.OVSDatapathNetdev {
return fmt.Errorf("exporting flows is not supported for OVS datapath type %s", o.config.OVSDatapathType)
} else {
// Convert the string input in net.Addr format
_, _, err := net.SplitHostPort(o.config.FlowCollectorAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector is given in invalid format. Error: %v", err)
}
o.flowCollector, err = net.ResolveTCPAddr("tcp", o.config.FlowCollectorAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector server over TCP proto is not resolved. Error: %v", err)
}
}
}
return nil
}
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/evanphx/json-patch v4.5.0+incompatible // indirect
github.com/go-openapi/spec v0.19.3
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.4.1
github.com/golang/mock v1.4.3
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.3.1 // indirect
Expand All @@ -35,8 +35,9 @@ require (
github.com/spf13/afero v1.2.2
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
github.com/srikartati/go-ipfixlib v0.0.0-20200605194748-3ddbc57c2b6c
github.com/streamrail/concurrent-map v0.0.0-20160823150647-8bf1e9bacbf6 // indirect
github.com/stretchr/testify v1.4.0
github.com/stretchr/testify v1.5.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware-tanzu/octant v0.10.2
Expand All @@ -56,6 +57,7 @@ require (
k8s.io/kube-aggregator v0.17.6
k8s.io/kube-openapi v0.0.0-20200410145947-bcb3869e6f29
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f
sigs.k8s.io/yaml v1.1.0
)

replace (
Expand All @@ -67,4 +69,5 @@ replace (
// available from 1.19.0 and later releases. Use this commit before Antrea bumps up its K8s
// dependency version.
k8s.io/client-go => github.com/tnqn/client-go v0.0.0-20200521074542-6c18cd58306a
// github.com/srikartati/go-ipfixlib v0.0.0-20200605194748-3ddbc57c2b6c => /antrea/go-ipfixlib
)
9 changes: 6 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.4.1 h1:ocYkMQY5RrXTYgXl7ICpV0IXwlEQGwKIsery4gyXa1U=
github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.3 h1:GV+pQPG/EUUbkh47niozDcADz6go/dUwhVzdUQHIVRw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -440,6 +440,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/srikartati/go-ipfixlib v0.0.0-20200605194748-3ddbc57c2b6c h1:tAxxMmAh5xHoRCtcIzc99Qp2VOA8DSnpzzsridosdao=
github.com/srikartati/go-ipfixlib v0.0.0-20200605194748-3ddbc57c2b6c/go.mod h1:kMk7mBXI7S5sFxbQSx+FOBbNogjsF8GNqCkYvM7LHLY=
github.com/streamrail/concurrent-map v0.0.0-20160803124810-238fe79560e1/go.mod h1:yqDD2twFAqxvvH5gtpwwgLsj5L1kbNwtoPoDOwBzXcs=
github.com/streamrail/concurrent-map v0.0.0-20160823150647-8bf1e9bacbf6 h1:XklXvOrWxWCDX2n4vdEQWkjuIP820XD6C4kF0O0FzH4=
github.com/streamrail/concurrent-map v0.0.0-20160823150647-8bf1e9bacbf6/go.mod h1:yqDD2twFAqxvvH5gtpwwgLsj5L1kbNwtoPoDOwBzXcs=
Expand All @@ -449,8 +451,9 @@ github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoH
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/ti-mo/conntrack v0.3.0 h1:572/72R9la2FVvO6CbsLiCmR48U3pgCvIlLKoUrExDU=
github.com/ti-mo/conntrack v0.3.0/go.mod h1:tPSYNx21TnjxGz99pLD/lAN4fuEViaJZz+pliMqnovk=
github.com/ti-mo/netfilter v0.3.1 h1:+ZTmeTx+64Jw2N/1gmqm42kruDWjQ90SMjWEB1e6VDs=
Expand Down
1 change: 1 addition & 0 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ MOCKGEN_TARGETS=(
"pkg/controller/querier ControllerQuerier"
"pkg/querier AgentNetworkPolicyInfoQuerier"
"pkg/agent/flowexporter/connections ConnTrackPoller,ConnTrack"
"pkg/agent/flowexporter/exporter IPFIXExportingProcess,IPFIXRecord"
)

# Command mockgen does not automatically replace variable YEAR with current year
Expand Down
33 changes: 33 additions & 0 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ var _ ConnectionStore = new(connectionStore)

type ConnectionStore interface {
Run(stopCh <-chan struct{})
IterateCxnMapWithCB(updateCallback flowexporter.FlowRecordUpdate) error
FlushConnectionStore()
}

type connectionStore struct {
Expand Down Expand Up @@ -97,7 +99,25 @@ func (cs *connectionStore) getConnByKey(flowTuple flowexporter.ConnectionKey) (*
return &conn, found
}

func (cs *connectionStore) IterateCxnMapWithCB(updateCallback flowexporter.FlowRecordUpdate) error {
cs.mutex.Lock()
defer cs.mutex.Unlock()

for k, v := range cs.connections {
cs.mutex.Unlock()
err := updateCallback(k, v)
if err != nil {
klog.Errorf("flow record update and send failed for flow with key: %v, cxn: %v", k, v)
return err
}
klog.V(2).Infof("Flow record added or updated")
cs.mutex.Lock()
}
return nil
}

// poll returns number of filtered connections after poll cycle
// TODO: Optimize polling cycle--Only poll invalid/close connection during every poll. Poll established right before export
func (cs *connectionStore) poll() (int, error) {
klog.V(2).Infof("Polling conntrack")

Expand All @@ -114,3 +134,16 @@ func (cs *connectionStore) poll() (int, error) {

return len(filteredConns), nil
}

// FlushConnectionStore after each IPFIX export of flow records.
// Timed out conntrack connections will not be sent as IPFIX flow records.
// TODO: Enhance/optimize this logic.
func (cs *connectionStore) FlushConnectionStore() {
klog.Infof("Flushing connection map")

cs.mutex.Lock()
defer cs.mutex.Unlock()
for conn := range cs.connections {
delete(cs.connections, conn)
}
}
Loading

0 comments on commit ac163fd

Please sign in to comment.