Skip to content

Commit

Permalink
Change flow export mechanism for Flow Aggregator
Browse files Browse the repository at this point in the history
Introduce active and inactive flow timeout configuration
knobs.
With active flow record timeout, every flow record is sent to the
collector based on its own active expiry timeout value.
With inactive flow record timeout, a flow record is sent to the
collector if no records are seen by the flow aggregator for the flow,
since it was last updated in the flow aggregator map.

Used priority queue approach to keep track of individual expiry values
for records.

In addition, we changed the default active flow timeout at the flow
exporter to 30s.
  • Loading branch information
srikartati committed May 18, 2021
1 parent 3134f7e commit c08f7af
Show file tree
Hide file tree
Showing 23 changed files with 301 additions and 204 deletions.
8 changes: 4 additions & 4 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3497,7 +3497,7 @@ data:
# stream of packets, a flow record will be exported to the collector once the elapsed
# time since the last export event is equal to the value of this timeout.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
#activeFlowExportTimeout: "60s"
#activeFlowExportTimeout: "30s"
# Provide the idle flow export timeout, which is the timeout after which a flow
# record is sent to the collector for idle flows. A flow is considered idle if no
Expand Down Expand Up @@ -3607,7 +3607,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-gg4m728h98
name: antrea-config-kt9gdmf62t
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -3727,7 +3727,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-gg4m728h98
name: antrea-config-kt9gdmf62t
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -4038,7 +4038,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-gg4m728h98
name: antrea-config-kt9gdmf62t
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
8 changes: 4 additions & 4 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3497,7 +3497,7 @@ data:
# stream of packets, a flow record will be exported to the collector once the elapsed
# time since the last export event is equal to the value of this timeout.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
#activeFlowExportTimeout: "60s"
#activeFlowExportTimeout: "30s"
# Provide the idle flow export timeout, which is the timeout after which a flow
# record is sent to the collector for idle flows. A flow is considered idle if no
Expand Down Expand Up @@ -3607,7 +3607,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-gg4m728h98
name: antrea-config-kt9gdmf62t
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -3727,7 +3727,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-gg4m728h98
name: antrea-config-kt9gdmf62t
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -4040,7 +4040,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-gg4m728h98
name: antrea-config-kt9gdmf62t
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
8 changes: 4 additions & 4 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3497,7 +3497,7 @@ data:
# stream of packets, a flow record will be exported to the collector once the elapsed
# time since the last export event is equal to the value of this timeout.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
#activeFlowExportTimeout: "60s"
#activeFlowExportTimeout: "30s"
# Provide the idle flow export timeout, which is the timeout after which a flow
# record is sent to the collector for idle flows. A flow is considered idle if no
Expand Down Expand Up @@ -3607,7 +3607,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-6bb22hc7fg
name: antrea-config-c8bf7gddbb
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -3727,7 +3727,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-6bb22hc7fg
name: antrea-config-c8bf7gddbb
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -4041,7 +4041,7 @@ spec:
path: /home/kubernetes/bin
name: host-cni-bin
- configMap:
name: antrea-config-6bb22hc7fg
name: antrea-config-c8bf7gddbb
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
8 changes: 4 additions & 4 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3502,7 +3502,7 @@ data:
# stream of packets, a flow record will be exported to the collector once the elapsed
# time since the last export event is equal to the value of this timeout.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
#activeFlowExportTimeout: "60s"
#activeFlowExportTimeout: "30s"
# Provide the idle flow export timeout, which is the timeout after which a flow
# record is sent to the collector for idle flows. A flow is considered idle if no
Expand Down Expand Up @@ -3612,7 +3612,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-f57t688chc
name: antrea-config-dh7f7g969b
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -3741,7 +3741,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-f57t688chc
name: antrea-config-dh7f7g969b
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -4087,7 +4087,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-f57t688chc
name: antrea-config-dh7f7g969b
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
8 changes: 4 additions & 4 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3502,7 +3502,7 @@ data:
# stream of packets, a flow record will be exported to the collector once the elapsed
# time since the last export event is equal to the value of this timeout.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
#activeFlowExportTimeout: "60s"
#activeFlowExportTimeout: "30s"
# Provide the idle flow export timeout, which is the timeout after which a flow
# record is sent to the collector for idle flows. A flow is considered idle if no
Expand Down Expand Up @@ -3612,7 +3612,7 @@ metadata:
annotations: {}
labels:
app: antrea
name: antrea-config-5ct9ktdt77
name: antrea-config-42cft4gc5f
namespace: kube-system
---
apiVersion: v1
Expand Down Expand Up @@ -3732,7 +3732,7 @@ spec:
key: node-role.kubernetes.io/master
volumes:
- configMap:
name: antrea-config-5ct9ktdt77
name: antrea-config-42cft4gc5f
name: antrea-config
- name: antrea-controller-tls
secret:
Expand Down Expand Up @@ -4043,7 +4043,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-config-5ct9ktdt77
name: antrea-config-42cft4gc5f
name: antrea-config
- hostPath:
path: /etc/cni/net.d
Expand Down
2 changes: 1 addition & 1 deletion build/yamls/base/conf/antrea-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ featureGates:
# stream of packets, a flow record will be exported to the collector once the elapsed
# time since the last export event is equal to the value of this timeout.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
#activeFlowExportTimeout: "60s"
#activeFlowExportTimeout: "30s"

# Provide the idle flow export timeout, which is the timeout after which a flow
# record is sent to the collector for idle flows. A flow is considered idle if no
Expand Down
21 changes: 15 additions & 6 deletions build/yamls/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,20 @@ data:
# If no L4 transport proto is given, we consider tcp as default.
#externalFlowCollectorAddr: ""
# Provide flow export interval as a duration string. This determines how often the flow aggregator exports flow
# records to the flow collector.
# Flow export interval should be greater than or equal to 1s (one second).
# Provide the active flow record timeout as a duration string. This determines
# how often the flow aggregator exports the active flow records to the flow
# collector. Thus, for flows with a continuous stream of packets, a flow record
# will be exported to the collector once the elapsed time since the last export
# event in the flow aggregator is equal to the value of this timeout.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
#flowExportInterval: 60s
#activeFlowRecordTimeout: 60s
# Provide the inactive flow record timeout as a duration string. This determines
# how often the flow aggregator exports the inactive flow records to the flow
# collector. A flow record is considered to be inactive if no matching record
# has been received by the flow aggregator in the specified interval.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
#inactiveFlowRecordTimeout: 90s
# Provide the transport protocol for the flow aggregator collecting process, which is tls, tcp or udp.
#aggregatorTransportProtocol: "tls"
Expand All @@ -155,7 +164,7 @@ metadata:
annotations: {}
labels:
app: flow-aggregator
name: flow-aggregator-configmap-hf78268hm6
name: flow-aggregator-configmap-k85k525hc5
namespace: flow-aggregator
---
apiVersion: v1
Expand Down Expand Up @@ -223,7 +232,7 @@ spec:
serviceAccountName: flow-aggregator
volumes:
- configMap:
name: flow-aggregator-configmap-hf78268hm6
name: flow-aggregator-configmap-k85k525hc5
name: flow-aggregator-config
- hostPath:
path: /var/log/antrea/flow-aggregator
Expand Down
17 changes: 13 additions & 4 deletions build/yamls/flow-aggregator/base/conf/flow-aggregator.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@
# If no L4 transport proto is given, we consider tcp as default.
#externalFlowCollectorAddr: ""

# Provide flow export interval as a duration string. This determines how often the flow aggregator exports flow
# records to the flow collector.
# Flow export interval should be greater than or equal to 1s (one second).
# Provide the active flow record timeout as a duration string. This determines
# how often the flow aggregator exports the active flow records to the flow
# collector. Thus, for flows with a continuous stream of packets, a flow record
# will be exported to the collector once the elapsed time since the last export
# event in the flow aggregator is equal to the value of this timeout.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
#flowExportInterval: 60s
#activeFlowRecordTimeout: 60s

# Provide the inactive flow record timeout as a duration string. This determines
# how often the flow aggregator exports the inactive flow records to the flow
# collector. A flow record is considered to be inactive if no matching record
# has been received by the flow aggregator in the specified interval.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
#inactiveFlowRecordTimeout: 90s

# Provide the transport protocol for the flow aggregator collecting process, which is tls, tcp or udp.
#aggregatorTransportProtocol: "tls"
Expand Down
9 changes: 6 additions & 3 deletions cmd/antrea-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,23 @@ type AgentConfig struct {
// Provide flow poll interval in format "0s". This determines how often flow
// exporter dumps connections in conntrack module. Flow poll interval should
// be greater than or equal to 1s(one second).
// Defaults to "5s". Follow the time units of duration type.
// Defaults to "5s". Valid time units are "ns", "us" (or "µs"), "ms", "s",
// "m", "h".
FlowPollInterval string `yaml:"flowPollInterval,omitempty"`
// Provide the active flow export timeout, which is the timeout after which
// a flow record is sent to the collector for active flows. Thus, for flows
// with a continuous stream of packets, a flow record will be exported to the
// collector once the elapsed time since the last export event is equal to the
// value of this timeout.
// Defaults to "60s". Follow the time units of duration type.
// Defaults to "30s". Valid time units are "ns", "us" (or "µs"), "ms", "s",
// "m", "h".
ActiveFlowExportTimeout string `yaml:"activeFlowExportTimeout,omitempty"`
// Provide the idle flow export timeout, which is the timeout after which a
// flow record is sent to the collector for idle flows. A flow is considered
// idle if no packet matching this flow has been observed since the last export
// event.
// Defaults to "15s". Follow the time units of duration type.
// Defaults to "15s". Valid time units are "ns", "us" (or "µs"), "ms", "s",
// "m", "h".
IdleFlowExportTimeout string `yaml:"idleFlowExportTimeout,omitempty"`
// Enable TLS communication from flow exporter to flow aggregator.
// Defaults to true.
Expand Down
2 changes: 1 addition & 1 deletion cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
defaultFlowCollectorTransport = "tcp"
defaultFlowCollectorPort = "4739"
defaultFlowPollInterval = 5 * time.Second
defaultActiveFlowExportTimeout = 60 * time.Second
defaultActiveFlowExportTimeout = 30 * time.Second
defaultIdleFlowExportTimeout = 15 * time.Second
defaultNPLPortRange = "40000-41000"
)
Expand Down
21 changes: 15 additions & 6 deletions cmd/flow-aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,21 @@ type FlowAggregatorConfig struct {
// If no L4 transport proto is given, we consider tcp as default.
// Defaults to "".
ExternalFlowCollectorAddr string `yaml:"externalFlowCollectorAddr,omitempty"`
// Provide flow export interval as a duration string. This determines how often the flow aggregator exports flow
// records to the flow collector.
// Flow export interval should be greater than or equal to 1s (one second).
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
// Defaults to "60s".
FlowExportInterval string `yaml:"flowExportInterval,omitempty"`
// Provide the active flow record timeout as a duration string. This determines
// how often the flow aggregator exports the active flow records to the flow
// collector. Thus, for flows with a continuous stream of packets, a flow record
// will be exported to the collector once the elapsed time since the last export
// event in the flow aggregator is equal to the value of this timeout.
// Defaults to "60s". Valid time units are "ns", "us" (or "µs"), "ms", "s",
// "m", "h".
ActiveFlowRecordTimeout string `yaml:"activeFlowRecordTimeout,omitempty"`
// Provide the inactive flow record timeout as a duration string. This determines
// how often the flow aggregator exports the inactive flow records to the flow
// collector. A flow record is considered to be inactive if no matching record
// has been received by the flow aggregator in the specified interval.
// Defaults to "90s". Valid time units are "ns", "us" (or "µs"), "ms", "s",
// "m", "h".
InactiveFlowRecordTimeout string `yaml:"inactiveFlowRecordTimeout,omitempty"`
// Transport protocol over which the aggregator collects IPFIX records from all Agents.
// Defaults to "tls"
AggregatorTransportProtocol flowaggregator.AggregatorTransportProtocol `yaml:"aggregatorTransportProtocol,omitempty"`
Expand Down
3 changes: 2 additions & 1 deletion cmd/flow-aggregator/flow-aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func run(o *Options) error {
flowAggregator := aggregator.NewFlowAggregator(
o.externalFlowCollectorAddr,
o.externalFlowCollectorProto,
o.exportInterval,
o.activeFlowRecordTimeout,
o.inactiveFlowRecordTimeout,
o.aggregatorTransportProtocol,
o.flowAggregatorAddress,
k8sClient,
Expand Down
24 changes: 17 additions & 7 deletions cmd/flow-aggregator/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
const (
defaultExternalFlowCollectorTransport = "tcp"
defaultExternalFlowCollectorPort = "4739"
defaultFlowExportInterval = 60 * time.Second
defaultActiveFlowRecordTimeout = 60 * time.Second
defaultInactiveFlowRecordTimeout = 90 * time.Second
defaultAggregatorTransportProtocol = flowaggregator.AggregatorTransportProtocolTLS
defaultFlowAggregatorAddress = "flow-aggregator.flow-aggregator.svc"
)
Expand All @@ -45,8 +46,10 @@ type Options struct {
externalFlowCollectorAddr string
// IPFIX flow collector transport protocol
externalFlowCollectorProto string
// Flow export interval of the flow aggregator
exportInterval time.Duration
// Expiration timeout for active flow records in the flow aggregator
activeFlowRecordTimeout time.Duration
// Expiration timeout for inactive flow records in the flow aggregator
inactiveFlowRecordTimeout time.Duration
// Transport protocol over which the aggregator collects IPFIX records from all Agents
aggregatorTransportProtocol flowaggregator.AggregatorTransportProtocol
// DNS name or IP address of flow aggregator for generating TLS certificate
Expand Down Expand Up @@ -90,14 +93,21 @@ func (o *Options) validate(args []string) error {
}
o.externalFlowCollectorAddr = net.JoinHostPort(host, port)
o.externalFlowCollectorProto = proto
if o.config.FlowExportInterval == "" {
o.exportInterval = defaultFlowExportInterval
if o.config.ActiveFlowRecordTimeout == "" {
o.activeFlowRecordTimeout = defaultActiveFlowRecordTimeout
} else {
flowExportInterval, err := flowexport.ParseFlowIntervalString(o.config.FlowExportInterval)
o.activeFlowRecordTimeout, err = time.ParseDuration(o.config.ActiveFlowRecordTimeout)
if err != nil {
return err
}
}
if o.config.InactiveFlowRecordTimeout == "" {
o.inactiveFlowRecordTimeout = defaultInactiveFlowRecordTimeout
} else {
o.inactiveFlowRecordTimeout, err = time.ParseDuration(o.config.ActiveFlowRecordTimeout)
if err != nil {
return err
}
o.exportInterval = flowExportInterval
}
if o.config.AggregatorTransportProtocol == "" {
o.aggregatorTransportProtocol = defaultAggregatorTransportProtocol
Expand Down
Loading

0 comments on commit c08f7af

Please sign in to comment.