diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 1b0d2d9a693..090edaaad50 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3497,7 +3497,7 @@ data: # stream of packets, a flow record will be exported to the collector once the elapsed # time since the last export event is equal to the value of this timeout. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - #activeFlowExportTimeout: "60s" + #activeFlowExportTimeout: "30s" # Provide the idle flow export timeout, which is the timeout after which a flow # record is sent to the collector for idle flows. A flow is considered idle if no @@ -3607,7 +3607,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-gg4m728h98 + name: antrea-config-kt9gdmf62t namespace: kube-system --- apiVersion: v1 @@ -3727,7 +3727,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-gg4m728h98 + name: antrea-config-kt9gdmf62t name: antrea-config - name: antrea-controller-tls secret: @@ -4038,7 +4038,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-gg4m728h98 + name: antrea-config-kt9gdmf62t name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 037da35bf59..da0d5845e2d 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3497,7 +3497,7 @@ data: # stream of packets, a flow record will be exported to the collector once the elapsed # time since the last export event is equal to the value of this timeout. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - #activeFlowExportTimeout: "60s" + #activeFlowExportTimeout: "30s" # Provide the idle flow export timeout, which is the timeout after which a flow # record is sent to the collector for idle flows. A flow is considered idle if no @@ -3607,7 +3607,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-gg4m728h98 + name: antrea-config-kt9gdmf62t namespace: kube-system --- apiVersion: v1 @@ -3727,7 +3727,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-gg4m728h98 + name: antrea-config-kt9gdmf62t name: antrea-config - name: antrea-controller-tls secret: @@ -4040,7 +4040,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-gg4m728h98 + name: antrea-config-kt9gdmf62t name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index ebc0136bd51..bced1873dac 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3497,7 +3497,7 @@ data: # stream of packets, a flow record will be exported to the collector once the elapsed # time since the last export event is equal to the value of this timeout. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - #activeFlowExportTimeout: "60s" + #activeFlowExportTimeout: "30s" # Provide the idle flow export timeout, which is the timeout after which a flow # record is sent to the collector for idle flows. A flow is considered idle if no @@ -3607,7 +3607,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-6bb22hc7fg + name: antrea-config-c8bf7gddbb namespace: kube-system --- apiVersion: v1 @@ -3727,7 +3727,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-6bb22hc7fg + name: antrea-config-c8bf7gddbb name: antrea-config - name: antrea-controller-tls secret: @@ -4041,7 +4041,7 @@ spec: path: /home/kubernetes/bin name: host-cni-bin - configMap: - name: antrea-config-6bb22hc7fg + name: antrea-config-c8bf7gddbb name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index eca87b8a1f1..c04924dfe5f 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3502,7 +3502,7 @@ data: # stream of packets, a flow record will be exported to the collector once the elapsed # time since the last export event is equal to the value of this timeout. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - #activeFlowExportTimeout: "60s" + #activeFlowExportTimeout: "30s" # Provide the idle flow export timeout, which is the timeout after which a flow # record is sent to the collector for idle flows. A flow is considered idle if no @@ -3612,7 +3612,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-f57t688chc + name: antrea-config-dh7f7g969b namespace: kube-system --- apiVersion: v1 @@ -3741,7 +3741,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-f57t688chc + name: antrea-config-dh7f7g969b name: antrea-config - name: antrea-controller-tls secret: @@ -4087,7 +4087,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-f57t688chc + name: antrea-config-dh7f7g969b name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index fb9cfde15a3..a945dd16454 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3502,7 +3502,7 @@ data: # stream of packets, a flow record will be exported to the collector once the elapsed # time since the last export event is equal to the value of this timeout. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - #activeFlowExportTimeout: "60s" + #activeFlowExportTimeout: "30s" # Provide the idle flow export timeout, which is the timeout after which a flow # record is sent to the collector for idle flows. A flow is considered idle if no @@ -3612,7 +3612,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-5ct9ktdt77 + name: antrea-config-42cft4gc5f namespace: kube-system --- apiVersion: v1 @@ -3732,7 +3732,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-5ct9ktdt77 + name: antrea-config-42cft4gc5f name: antrea-config - name: antrea-controller-tls secret: @@ -4043,7 +4043,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-5ct9ktdt77 + name: antrea-config-42cft4gc5f name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/base/conf/antrea-agent.conf b/build/yamls/base/conf/antrea-agent.conf index 201a82e58d5..3bf7a3c570c 100644 --- a/build/yamls/base/conf/antrea-agent.conf +++ b/build/yamls/base/conf/antrea-agent.conf @@ -121,7 +121,7 @@ featureGates: # stream of packets, a flow record will be exported to the collector once the elapsed # time since the last export event is equal to the value of this timeout. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". -#activeFlowExportTimeout: "60s" +#activeFlowExportTimeout: "30s" # Provide the idle flow export timeout, which is the timeout after which a flow # record is sent to the collector for idle flows. A flow is considered idle if no diff --git a/build/yamls/flow-aggregator.yml b/build/yamls/flow-aggregator.yml index d00a5a500ae..13f0f5d974a 100644 --- a/build/yamls/flow-aggregator.yml +++ b/build/yamls/flow-aggregator.yml @@ -131,11 +131,20 @@ data: # If no L4 transport proto is given, we consider tcp as default. #externalFlowCollectorAddr: "" - # Provide flow export interval as a duration string. This determines how often the flow aggregator exports flow - # records to the flow collector. - # Flow export interval should be greater than or equal to 1s (one second). + # Provide the active flow record timeout as a duration string. This determines + # how often the flow aggregator exports the active flow records to the flow + # collector. Thus, for flows with a continuous stream of packets, a flow record + # will be exported to the collector once the elapsed time since the last export + # event in the flow aggregator is equal to the value of this timeout. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - #flowExportInterval: 60s + #activeFlowRecordTimeout: 60s + + # Provide the inactive flow record timeout as a duration string. This determines + # how often the flow aggregator exports the inactive flow records to the flow + # collector. A flow record is considered to be inactive if no matching record + # has been received by the flow aggregator in the specified interval. + # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + #inactiveFlowRecordTimeout: 90s # Provide the transport protocol for the flow aggregator collecting process, which is tls, tcp or udp. #aggregatorTransportProtocol: "tls" @@ -155,7 +164,7 @@ metadata: annotations: {} labels: app: flow-aggregator - name: flow-aggregator-configmap-hf78268hm6 + name: flow-aggregator-configmap-k85k525hc5 namespace: flow-aggregator --- apiVersion: v1 @@ -223,7 +232,7 @@ spec: serviceAccountName: flow-aggregator volumes: - configMap: - name: flow-aggregator-configmap-hf78268hm6 + name: flow-aggregator-configmap-k85k525hc5 name: flow-aggregator-config - hostPath: path: /var/log/antrea/flow-aggregator diff --git a/build/yamls/flow-aggregator/base/conf/flow-aggregator.conf b/build/yamls/flow-aggregator/base/conf/flow-aggregator.conf index fbf4721e3a9..35c79c457ed 100644 --- a/build/yamls/flow-aggregator/base/conf/flow-aggregator.conf +++ b/build/yamls/flow-aggregator/base/conf/flow-aggregator.conf @@ -2,11 +2,20 @@ # If no L4 transport proto is given, we consider tcp as default. #externalFlowCollectorAddr: "" -# Provide flow export interval as a duration string. This determines how often the flow aggregator exports flow -# records to the flow collector. -# Flow export interval should be greater than or equal to 1s (one second). +# Provide the active flow record timeout as a duration string. This determines +# how often the flow aggregator exports the active flow records to the flow +# collector. Thus, for flows with a continuous stream of packets, a flow record +# will be exported to the collector once the elapsed time since the last export +# event in the flow aggregator is equal to the value of this timeout. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". -#flowExportInterval: 60s +#activeFlowRecordTimeout: 60s + +# Provide the inactive flow record timeout as a duration string. This determines +# how often the flow aggregator exports the inactive flow records to the flow +# collector. A flow record is considered to be inactive if no matching record +# has been received by the flow aggregator in the specified interval. +# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". +#inactiveFlowRecordTimeout: 90s # Provide the transport protocol for the flow aggregator collecting process, which is tls, tcp or udp. #aggregatorTransportProtocol: "tls" diff --git a/cmd/antrea-agent/config.go b/cmd/antrea-agent/config.go index ec3a4195537..81395c8eb33 100644 --- a/cmd/antrea-agent/config.go +++ b/cmd/antrea-agent/config.go @@ -114,20 +114,23 @@ type AgentConfig struct { // Provide flow poll interval in format "0s". This determines how often flow // exporter dumps connections in conntrack module. Flow poll interval should // be greater than or equal to 1s(one second). - // Defaults to "5s". Follow the time units of duration type. + // Defaults to "5s". Valid time units are "ns", "us" (or "µs"), "ms", "s", + // "m", "h". FlowPollInterval string `yaml:"flowPollInterval,omitempty"` // Provide the active flow export timeout, which is the timeout after which // a flow record is sent to the collector for active flows. Thus, for flows // with a continuous stream of packets, a flow record will be exported to the // collector once the elapsed time since the last export event is equal to the // value of this timeout. - // Defaults to "60s". Follow the time units of duration type. + // Defaults to "30s". Valid time units are "ns", "us" (or "µs"), "ms", "s", + // "m", "h". ActiveFlowExportTimeout string `yaml:"activeFlowExportTimeout,omitempty"` // Provide the idle flow export timeout, which is the timeout after which a // flow record is sent to the collector for idle flows. A flow is considered // idle if no packet matching this flow has been observed since the last export // event. - // Defaults to "15s". Follow the time units of duration type. + // Defaults to "15s". Valid time units are "ns", "us" (or "µs"), "ms", "s", + // "m", "h". IdleFlowExportTimeout string `yaml:"idleFlowExportTimeout,omitempty"` // Enable TLS communication from flow exporter to flow aggregator. // Defaults to true. diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 1f550e7fcb8..e219e906181 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -42,7 +42,7 @@ const ( defaultFlowCollectorTransport = "tcp" defaultFlowCollectorPort = "4739" defaultFlowPollInterval = 5 * time.Second - defaultActiveFlowExportTimeout = 60 * time.Second + defaultActiveFlowExportTimeout = 30 * time.Second defaultIdleFlowExportTimeout = 15 * time.Second defaultNPLPortRange = "40000-41000" ) diff --git a/cmd/flow-aggregator/config.go b/cmd/flow-aggregator/config.go index f145fc2d4a2..8dda33cf07a 100644 --- a/cmd/flow-aggregator/config.go +++ b/cmd/flow-aggregator/config.go @@ -21,12 +21,21 @@ type FlowAggregatorConfig struct { // If no L4 transport proto is given, we consider tcp as default. // Defaults to "". ExternalFlowCollectorAddr string `yaml:"externalFlowCollectorAddr,omitempty"` - // Provide flow export interval as a duration string. This determines how often the flow aggregator exports flow - // records to the flow collector. - // Flow export interval should be greater than or equal to 1s (one second). - // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - // Defaults to "60s". - FlowExportInterval string `yaml:"flowExportInterval,omitempty"` + // Provide the active flow record timeout as a duration string. This determines + // how often the flow aggregator exports the active flow records to the flow + // collector. Thus, for flows with a continuous stream of packets, a flow record + // will be exported to the collector once the elapsed time since the last export + // event in the flow aggregator is equal to the value of this timeout. + // Defaults to "60s". Valid time units are "ns", "us" (or "µs"), "ms", "s", + // "m", "h". + ActiveFlowRecordTimeout string `yaml:"activeFlowRecordTimeout,omitempty"` + // Provide the inactive flow record timeout as a duration string. This determines + // how often the flow aggregator exports the inactive flow records to the flow + // collector. A flow record is considered to be inactive if no matching record + // has been received by the flow aggregator in the specified interval. + // Defaults to "90s". Valid time units are "ns", "us" (or "µs"), "ms", "s", + // "m", "h". + InactiveFlowRecordTimeout string `yaml:"inactiveFlowRecordTimeout,omitempty"` // Transport protocol over which the aggregator collects IPFIX records from all Agents. // Defaults to "tls" AggregatorTransportProtocol flowaggregator.AggregatorTransportProtocol `yaml:"aggregatorTransportProtocol,omitempty"` diff --git a/cmd/flow-aggregator/flow-aggregator.go b/cmd/flow-aggregator/flow-aggregator.go index 7824a42e6b1..e6210a3d754 100644 --- a/cmd/flow-aggregator/flow-aggregator.go +++ b/cmd/flow-aggregator/flow-aggregator.go @@ -90,7 +90,8 @@ func run(o *Options) error { flowAggregator := aggregator.NewFlowAggregator( o.externalFlowCollectorAddr, o.externalFlowCollectorProto, - o.exportInterval, + o.activeFlowRecordTimeout, + o.inactiveFlowRecordTimeout, o.aggregatorTransportProtocol, o.flowAggregatorAddress, k8sClient, diff --git a/cmd/flow-aggregator/options.go b/cmd/flow-aggregator/options.go index 6e786b71983..0364aa4a443 100644 --- a/cmd/flow-aggregator/options.go +++ b/cmd/flow-aggregator/options.go @@ -31,7 +31,8 @@ import ( const ( defaultExternalFlowCollectorTransport = "tcp" defaultExternalFlowCollectorPort = "4739" - defaultFlowExportInterval = 60 * time.Second + defaultActiveFlowRecordTimeout = 60 * time.Second + defaultInactiveFlowRecordTimeout = 90 * time.Second defaultAggregatorTransportProtocol = flowaggregator.AggregatorTransportProtocolTLS defaultFlowAggregatorAddress = "flow-aggregator.flow-aggregator.svc" ) @@ -45,8 +46,10 @@ type Options struct { externalFlowCollectorAddr string // IPFIX flow collector transport protocol externalFlowCollectorProto string - // Flow export interval of the flow aggregator - exportInterval time.Duration + // Expiration timeout for active flow records in the flow aggregator + activeFlowRecordTimeout time.Duration + // Expiration timeout for inactive flow records in the flow aggregator + inactiveFlowRecordTimeout time.Duration // Transport protocol over which the aggregator collects IPFIX records from all Agents aggregatorTransportProtocol flowaggregator.AggregatorTransportProtocol // DNS name or IP address of flow aggregator for generating TLS certificate @@ -90,14 +93,21 @@ func (o *Options) validate(args []string) error { } o.externalFlowCollectorAddr = net.JoinHostPort(host, port) o.externalFlowCollectorProto = proto - if o.config.FlowExportInterval == "" { - o.exportInterval = defaultFlowExportInterval + if o.config.ActiveFlowRecordTimeout == "" { + o.activeFlowRecordTimeout = defaultActiveFlowRecordTimeout } else { - flowExportInterval, err := flowexport.ParseFlowIntervalString(o.config.FlowExportInterval) + o.activeFlowRecordTimeout, err = time.ParseDuration(o.config.ActiveFlowRecordTimeout) + if err != nil { + return err + } + } + if o.config.InactiveFlowRecordTimeout == "" { + o.inactiveFlowRecordTimeout = defaultInactiveFlowRecordTimeout + } else { + o.inactiveFlowRecordTimeout, err = time.ParseDuration(o.config.ActiveFlowRecordTimeout) if err != nil { return err } - o.exportInterval = flowExportInterval } if o.config.AggregatorTransportProtocol == "" { o.aggregatorTransportProtocol = defaultAggregatorTransportProtocol diff --git a/go.mod b/go.mod index 175f7eeef3d..7991ab6ea46 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/go-openapi/spec v0.19.5 github.com/gogo/protobuf v1.3.2 github.com/golang/mock v1.5.0 - github.com/golang/protobuf v1.4.3 + github.com/golang/protobuf v1.5.0 github.com/google/uuid v1.1.2 github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd github.com/pkg/errors v0.9.1 @@ -40,7 +40,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/ti-mo/conntrack v0.3.0 github.com/vishvananda/netlink v1.1.0 - github.com/vmware/go-ipfix v0.4.8 + github.com/vmware/go-ipfix v0.5.1 golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 golang.org/x/mod v0.4.0 @@ -49,7 +49,6 @@ require ( golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba google.golang.org/grpc v1.27.1 - gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.21.0 diff --git a/go.sum b/go.sum index 71e25f7ac24..11163d238d9 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/Shopify/sarama v1.27.2/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/TomCodeLV/OVSDB-golang-lib v0.0.0-20200116135253-9bbdfadcd881 h1:6PUwmG2qZd1LNoe1WsdBmoJP2PseuC2P4QBGPTz6mQc= github.com/TomCodeLV/OVSDB-golang-lib v0.0.0-20200116135253-9bbdfadcd881/go.mod h1:J623KtHQCavhT3jhFh0wg5i6QQRdnsAxAlBrOY0TUMw= github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= @@ -175,6 +177,9 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1 h1:yY9rWGoXv1U5pl4gxqlULARMQD7x0QG85lqEXTWysik= github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= @@ -194,6 +199,8 @@ github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwo github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -301,8 +308,10 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golangplus/testing v0.0.0-20180327235837-af21d9c3145e/go.mod h1:0AA//k/eakGydO4jKRoRL2j92ZKSzTgj9tclaCrvXHk= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= @@ -312,8 +321,9 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -362,6 +372,7 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= @@ -379,6 +390,7 @@ github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw= @@ -399,12 +411,14 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -489,6 +503,7 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pion/dtls/v2 v2.0.3 h1:3qQ0s4+TXD00rsllL8g8KQcxAs+Y/Z6oz618RXX6p14= github.com/pion/dtls/v2 v2.0.3/go.mod h1:TUjyL8bf8LH95h81Xj7kATmzMRt29F/4lxpIPj2Xe4Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -532,6 +547,7 @@ github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rakelkar/gonetsh v0.0.0-20210226024844-dfffed138500 h1:TJ99mSkY9CTjL/FXiJZ6i8xWy7gasANjR0J/xhj4PzE= github.com/rakelkar/gonetsh v0.0.0-20210226024844-dfffed138500/go.mod h1:MkEXf5yV9DRTy8TfpWdvMuCTkxajNE/0tn9pvZ6ikDw= +github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -610,10 +626,12 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= -github.com/vmware/go-ipfix v0.4.8 h1:kyQX+n1lFfzOHqftjlyfKc9bkc5KeTp6iZSePDv/AhQ= -github.com/vmware/go-ipfix v0.4.8/go.mod h1:JvYQ70hgDDdMVjz1kz+Kb0n5u0RD65TnXcl6MFkjpq0= +github.com/vmware/go-ipfix v0.5.1 h1:Z76msiFceg8FnqZGgE9xyNWJzfp1OHJiRyh+Awovz3s= +github.com/vmware/go-ipfix v0.5.1/go.mod h1:P0+G5MH48fwF66O+D/C28lxCX2qqpq6Enyb3ju2aqvY= github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da h1:ragN21nQa4zKuCwR2UEbTXEAh3L2YN/Id5SCVkjjwdY= github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= @@ -730,6 +748,7 @@ golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 h1:OgUuv8lsRpBibGNbSizVwKWlysjaNzmC9gYMhPVfqFM= @@ -923,8 +942,10 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= -google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -941,6 +962,11 @@ gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKW gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= @@ -956,8 +982,9 @@ gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= @@ -993,7 +1020,6 @@ k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= -k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts= k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= k8s.io/kube-aggregator v0.21.0 h1:my2WYu8RJcj/ZzWAjPPnmxNRELk/iCdPjMaOmsZOeBU= diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 3fc9da6e261..3e51c6427d0 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -540,9 +540,9 @@ func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 { // TODO: support Pod-To-External flows in network policy only mode. if exp.isNetworkPolicyOnly { if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" { - return ipfixregistry.InterNode + return ipfixregistry.FlowTypeInterNode } - return ipfixregistry.IntraNode + return ipfixregistry.FlowTypeIntraNode } if exp.nodeRouteController == nil { @@ -552,11 +552,11 @@ func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 { if exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.SourceAddress) { if record.Conn.Mark == openflow.ServiceCTMark || exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.DestinationAddress) { if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" { - return ipfixregistry.InterNode + return ipfixregistry.FlowTypeInterNode } - return ipfixregistry.IntraNode + return ipfixregistry.FlowTypeIntraNode } else { - return ipfixregistry.ToExternal + return ipfixregistry.FlowTypeToExternal } } else { // We do not support External-To-Pod flows for now. diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 83be156d917..e10e9f09320 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -156,7 +156,8 @@ type flowAggregator struct { aggregatorTransportProtocol AggregatorTransportProtocol collectingProcess ipfix.IPFIXCollectingProcess aggregationProcess ipfix.IPFIXAggregationProcess - exportInterval time.Duration + activeFlowRecordTimeout time.Duration + inactiveFlowRecordTimeout time.Duration exportingProcess ipfix.IPFIXExportingProcess templateIDv4 uint16 templateIDv6 uint16 @@ -170,7 +171,8 @@ type flowAggregator struct { func NewFlowAggregator( externalFlowCollectorAddr string, externalFlowCollectorProto string, - exportInterval time.Duration, + activeFlowRecTimeout time.Duration, + inactiveFlowRecTimeout time.Duration, aggregatorTransportProtocol AggregatorTransportProtocol, flowAggregatorAddress string, k8sClient kubernetes.Interface, @@ -179,26 +181,21 @@ func NewFlowAggregator( registry := ipfix.NewIPFIXRegistry() registry.LoadRegistry() fa := &flowAggregator{ - externalFlowCollectorAddr, - externalFlowCollectorProto, - aggregatorTransportProtocol, - nil, - nil, - exportInterval, - nil, - 0, - 0, - registry, - ipfixentities.NewSet(false), - flowAggregatorAddress, - k8sClient, - observationDomainID, + externalFlowCollectorAddr: externalFlowCollectorAddr, + externalFlowCollectorProto: externalFlowCollectorProto, + aggregatorTransportProtocol: aggregatorTransportProtocol, + activeFlowRecordTimeout: activeFlowRecTimeout, + inactiveFlowRecordTimeout: inactiveFlowRecTimeout, + registry: registry, + set: ipfixentities.NewSet(false), + flowAggregatorAddress: flowAggregatorAddress, + k8sClient: k8sClient, + observationDomainID: observationDomainID, } return fa } func (fa *flowAggregator) InitCollectingProcess() error { - var err error var cpInput collector.CollectorInput if fa.aggregatorTransportProtocol == AggregatorTransportProtocolTLS { parentCert, privateKey, caCert, err := generateCACertKey() @@ -245,6 +242,7 @@ func (fa *flowAggregator) InitCollectingProcess() error { IsEncrypted: false, } } + var err error fa.collectingProcess, err = ipfix.NewIPFIXCollectingProcess(cpInput) return err } @@ -252,10 +250,12 @@ func (fa *flowAggregator) InitCollectingProcess() error { func (fa *flowAggregator) InitAggregationProcess() error { var err error apInput := ipfixintermediate.AggregationInput{ - MessageChan: fa.collectingProcess.GetMsgChan(), - WorkerNum: aggregationWorkerNum, - CorrelateFields: correlateFields, - AggregateElements: aggregationElements, + MessageChan: fa.collectingProcess.GetMsgChan(), + WorkerNum: aggregationWorkerNum, + CorrelateFields: correlateFields, + ActiveExpiryTimeout: fa.activeFlowRecordTimeout, + InactiveExpiryTimeout: fa.inactiveFlowRecordTimeout, + AggregateElements: aggregationElements, } fa.aggregationProcess, err = ipfix.NewIPFIXAggregationProcess(apInput) return err @@ -325,46 +325,54 @@ func (fa *flowAggregator) initExportingProcess() error { } func (fa *flowAggregator) Run(stopCh <-chan struct{}) { - exportTicker := time.NewTicker(fa.exportInterval) - defer exportTicker.Stop() go fa.collectingProcess.Start() defer fa.collectingProcess.Stop() go fa.aggregationProcess.Start() defer fa.aggregationProcess.Stop() + go fa.flowRecordExpiryCheck(stopCh) + + <-stopCh +} + +func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) { + expireTimer := time.NewTimer(fa.activeFlowRecordTimeout) + for { select { case <-stopCh: if fa.exportingProcess != nil { fa.exportingProcess.CloseConnToCollector() } + expireTimer.Stop() return - case <-exportTicker.C: + case <-expireTimer.C: if fa.exportingProcess == nil { err := fa.initExportingProcess() if err != nil { - klog.Errorf("Error when initializing exporting process: %v, will retry in %s", err, fa.exportInterval) - // Initializing exporting process fails, will retry in next exportInterval + klog.Errorf("Error when initializing exporting process: %v, will retry in %s", err, fa.activeFlowRecordTimeout) + // Initializing exporting process fails, will retry in next cycle. + expireTimer.Reset(fa.activeFlowRecordTimeout) continue } } - err := fa.aggregationProcess.ForAllRecordsDo(fa.sendFlowKeyRecord) - if err != nil { - klog.Errorf("Error when sending flow records: %v", err) + // Pop the flow record item from expire priority queue in the Aggregation + // Process and send the flow records. + if err := fa.aggregationProcess.ForAllExpiredFlowRecordsDo(fa.sendFlowKeyRecord); err != nil { + klog.Errorf("Error when sending expired flow records: %v", err) // If there is an error when sending flow records because of intermittent connectivity, we reset the connection // to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records. fa.exportingProcess.CloseConnToCollector() fa.exportingProcess = nil + expireTimer.Reset(fa.activeFlowRecordTimeout) continue } + // Get the new expiry and reset the timer. + expireTimer.Reset(fa.aggregationProcess.GetExpiryFromExpirePriorityQueue()) } } } func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, record ipfixintermediate.AggregationFlowRecord) error { - if !record.ReadyToSend { - klog.V(4).Info("Skip sending record that is not correlated.") - return nil - } templateID := fa.templateIDv4 if net.ParseIP(key.SourceAddress).To4() == nil || net.ParseIP(key.DestinationAddress).To4() == nil { templateID = fa.templateIDv6 @@ -372,17 +380,21 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor // TODO: more records per data set will be supported when go-ipfix supports size check when adding records fa.set.ResetSet() if err := fa.set.PrepareSet(ipfixentities.Data, templateID); err != nil { - return fmt.Errorf("error when preparing set: %v", err) + return err } err := fa.set.AddRecord(record.Record.GetOrderedElementList(), templateID) if err != nil { - return fmt.Errorf("error when adding the record to the set: %v", err) + return err } - _, err = fa.sendDataSet(fa.set) + sentBytes, err := fa.exportingProcess.SendSet(fa.set) if err != nil { return err } - fa.aggregationProcess.DeleteFlowKeyFromMapWithoutLock(key) + if err = fa.aggregationProcess.ResetStatElementsInRecord(record.Record); err != nil { + return err + } + + klog.V(4).Infof("Data set sent successfully: %d Bytes sent", sentBytes) return nil } @@ -453,12 +465,3 @@ func (fa *flowAggregator) sendTemplateSet(templateSet ipfixentities.Set, isIPv6 bytesSent, err := fa.exportingProcess.SendSet(templateSet) return bytesSent, err } - -func (fa *flowAggregator) sendDataSet(dataSet ipfixentities.Set) (int, error) { - sentBytes, err := fa.exportingProcess.SendSet(dataSet) - if err != nil { - return 0, fmt.Errorf("error when sending data set: %v", err) - } - klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes) - return sentBytes, nil -} diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index ed7d3e38da0..a822fe876c5 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -20,7 +20,6 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" - ipfixentities "github.com/vmware/go-ipfix/pkg/entities" ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing" ipfixintermediate "github.com/vmware/go-ipfix/pkg/intermediate" @@ -32,7 +31,8 @@ import ( const ( testTemplateIDv4 = uint16(256) testTemplateIDv6 = uint16(257) - testExportInterval = 60 * time.Second + testActiveTimeout = 60 * time.Second + testInactiveTimeout = 180 * time.Second testObservationDomainID = 0xabcd ) @@ -47,22 +47,20 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { mockAggregationProcess := ipfixtest.NewMockIPFIXAggregationProcess(ctrl) fa := &flowAggregator{ - "", - "", - "tcp", - nil, - mockAggregationProcess, - testExportInterval, - mockIPFIXExpProc, - testTemplateIDv4, - testTemplateIDv6, - mockIPFIXRegistry, - mockDataSet, - "", - nil, - testObservationDomainID, + externalFlowCollectorAddr: "", + externalFlowCollectorProto: "", + aggregatorTransportProtocol: "tcp", + aggregationProcess: mockAggregationProcess, + activeFlowRecordTimeout: testActiveTimeout, + inactiveFlowRecordTimeout: testInactiveTimeout, + exportingProcess: mockIPFIXExpProc, + templateIDv4: testTemplateIDv4, + templateIDv6: testTemplateIDv6, + registry: mockIPFIXRegistry, + set: mockDataSet, + flowAggregatorAddress: "", + observationDomainID: testObservationDomainID, } - ipv4Key := ipfixintermediate.FlowKey{ SourceAddress: "10.0.0.1", DestinationAddress: "10.0.0.2", @@ -82,13 +80,6 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { readyRecord := ipfixintermediate.AggregationFlowRecord{ Record: mockRecord, ReadyToSend: true, - IsActive: true, - } - - notReadyRecord := ipfixintermediate.AggregationFlowRecord{ - Record: mockRecord, - ReadyToSend: false, - IsActive: true, } testcases := []struct { @@ -109,40 +100,23 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { ipv6Key, readyRecord, }, - { - "IPv4_not_ready_to_send", - false, - ipv4Key, - notReadyRecord, - }, - { - "IPv6_not_ready_to_send", - true, - ipv6Key, - notReadyRecord, - }, } for _, tc := range testcases { - if tc.flowRecord.ReadyToSend { - templateID := fa.templateIDv4 - if tc.isIPv6 { - templateID = fa.templateIDv6 - } - mockDataSet.EXPECT().ResetSet() - mockDataSet.EXPECT().PrepareSet(ipfixentities.Data, templateID).Return(nil) - elementList := make([]*ipfixentities.InfoElementWithValue, 0) - mockRecord.EXPECT().GetOrderedElementList().Return(elementList) - mockDataSet.EXPECT().AddRecord(elementList, templateID).Return(nil) - mockIPFIXExpProc.EXPECT().SendSet(mockDataSet).Return(0, nil) - mockAggregationProcess.EXPECT().DeleteFlowKeyFromMapWithoutLock(tc.flowKey) - - err := fa.sendFlowKeyRecord(tc.flowKey, tc.flowRecord) - assert.NoError(t, err, "Error in sending flow key record: %v, key: %v, record: %v", err, tc.flowKey, tc.flowRecord) - } else { - err := fa.sendFlowKeyRecord(tc.flowKey, tc.flowRecord) - assert.NoError(t, err, "Error in sending flow key record that is not ready: %v, key: %v, record: %v", err, tc.flowKey, tc.flowRecord) + templateID := fa.templateIDv4 + if tc.isIPv6 { + templateID = fa.templateIDv6 } + mockDataSet.EXPECT().ResetSet() + mockDataSet.EXPECT().PrepareSet(ipfixentities.Data, templateID).Return(nil) + elementList := make([]*ipfixentities.InfoElementWithValue, 0) + mockRecord.EXPECT().GetOrderedElementList().Return(elementList) + mockDataSet.EXPECT().AddRecord(elementList, templateID).Return(nil) + mockIPFIXExpProc.EXPECT().SendSet(mockDataSet).Return(0, nil) + mockAggregationProcess.EXPECT().ResetStatElementsInRecord(mockRecord).Return(nil) + + err := fa.sendFlowKeyRecord(tc.flowKey, tc.flowRecord) + assert.NoError(t, err, "Error in sending flow key record: %v, key: %v, record: %v", err, tc.flowKey, tc.flowRecord) } } @@ -155,20 +129,20 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) { mockTempSet := ipfixentitiestesting.NewMockSet(ctrl) fa := &flowAggregator{ - "", - "", - "tcp", - nil, - nil, - testExportInterval, - mockIPFIXExpProc, - testTemplateIDv4, - testTemplateIDv6, - mockIPFIXRegistry, - ipfixentitiestesting.NewMockSet(ctrl), - "", - nil, - testObservationDomainID, + externalFlowCollectorAddr: "", + externalFlowCollectorProto: "", + aggregatorTransportProtocol: "tcp", + collectingProcess: nil, + aggregationProcess: nil, + activeFlowRecordTimeout: testActiveTimeout, + exportingProcess: mockIPFIXExpProc, + templateIDv4: testTemplateIDv4, + templateIDv6: testTemplateIDv6, + registry: mockIPFIXRegistry, + set: mockTempSet, + flowAggregatorAddress: "", + k8sClient: nil, + observationDomainID: testObservationDomainID, } for _, isIPv6 := range []bool{false, true} { diff --git a/pkg/ipfix/ipfix_intermediate.go b/pkg/ipfix/ipfix_intermediate.go index 82b294e0732..2fb59b2be4d 100644 --- a/pkg/ipfix/ipfix_intermediate.go +++ b/pkg/ipfix/ipfix_intermediate.go @@ -16,7 +16,9 @@ package ipfix import ( "fmt" + "time" + ipfixentities "github.com/vmware/go-ipfix/pkg/entities" ipfixintermediate "github.com/vmware/go-ipfix/pkg/intermediate" ) @@ -26,8 +28,9 @@ var _ IPFIXAggregationProcess = new(ipfixAggregationProcess) type IPFIXAggregationProcess interface { Start() Stop() - ForAllRecordsDo(callback ipfixintermediate.FlowKeyRecordMapCallBack) error - DeleteFlowKeyFromMapWithoutLock(flowKey ipfixintermediate.FlowKey) + ForAllExpiredFlowRecordsDo(callback ipfixintermediate.FlowKeyRecordMapCallBack) error + GetExpiryFromExpirePriorityQueue() time.Duration + ResetStatElementsInRecord(record ipfixentities.Record) error } type ipfixAggregationProcess struct { @@ -53,10 +56,15 @@ func (ap *ipfixAggregationProcess) Stop() { ap.AggregationProcess.Stop() } -func (ap *ipfixAggregationProcess) ForAllRecordsDo(callback ipfixintermediate.FlowKeyRecordMapCallBack) error { - return ap.AggregationProcess.ForAllRecordsDo(callback) +func (ap *ipfixAggregationProcess) ForAllExpiredFlowRecordsDo(callback ipfixintermediate.FlowKeyRecordMapCallBack) error { + err := ap.AggregationProcess.ForAllExpiredFlowRecordsDo(callback) + return err } -func (ap *ipfixAggregationProcess) DeleteFlowKeyFromMapWithoutLock(flowKey ipfixintermediate.FlowKey) { - ap.AggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey) +func (ap *ipfixAggregationProcess) GetExpiryFromExpirePriorityQueue() time.Duration { + return ap.AggregationProcess.GetExpiryFromExpirePriorityQueue() +} + +func (ap *ipfixAggregationProcess) ResetStatElementsInRecord(record ipfixentities.Record) error { + return ap.AggregationProcess.ResetStatElementsInRecord(record) } diff --git a/pkg/ipfix/testing/mock_ipfix.go b/pkg/ipfix/testing/mock_ipfix.go index 98047108a36..77d6b5ac723 100644 --- a/pkg/ipfix/testing/mock_ipfix.go +++ b/pkg/ipfix/testing/mock_ipfix.go @@ -24,6 +24,7 @@ import ( entities "github.com/vmware/go-ipfix/pkg/entities" intermediate "github.com/vmware/go-ipfix/pkg/intermediate" reflect "reflect" + time "time" ) // MockIPFIXExportingProcess is a mock of IPFIXExportingProcess interface @@ -224,30 +225,46 @@ func (m *MockIPFIXAggregationProcess) EXPECT() *MockIPFIXAggregationProcessMockR return m.recorder } -// DeleteFlowKeyFromMapWithoutLock mocks base method -func (m *MockIPFIXAggregationProcess) DeleteFlowKeyFromMapWithoutLock(arg0 intermediate.FlowKey) { +// ForAllExpiredFlowRecordsDo mocks base method +func (m *MockIPFIXAggregationProcess) ForAllExpiredFlowRecordsDo(arg0 intermediate.FlowKeyRecordMapCallBack) error { m.ctrl.T.Helper() - m.ctrl.Call(m, "DeleteFlowKeyFromMapWithoutLock", arg0) + ret := m.ctrl.Call(m, "ForAllExpiredFlowRecordsDo", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// ForAllExpiredFlowRecordsDo indicates an expected call of ForAllExpiredFlowRecordsDo +func (mr *MockIPFIXAggregationProcessMockRecorder) ForAllExpiredFlowRecordsDo(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForAllExpiredFlowRecordsDo", reflect.TypeOf((*MockIPFIXAggregationProcess)(nil).ForAllExpiredFlowRecordsDo), arg0) +} + +// GetExpiryFromExpirePriorityQueue mocks base method +func (m *MockIPFIXAggregationProcess) GetExpiryFromExpirePriorityQueue() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetExpiryFromExpirePriorityQueue") + ret0, _ := ret[0].(time.Duration) + return ret0 } -// DeleteFlowKeyFromMapWithoutLock indicates an expected call of DeleteFlowKeyFromMapWithoutLock -func (mr *MockIPFIXAggregationProcessMockRecorder) DeleteFlowKeyFromMapWithoutLock(arg0 interface{}) *gomock.Call { +// GetExpiryFromExpirePriorityQueue indicates an expected call of GetExpiryFromExpirePriorityQueue +func (mr *MockIPFIXAggregationProcessMockRecorder) GetExpiryFromExpirePriorityQueue() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteFlowKeyFromMapWithoutLock", reflect.TypeOf((*MockIPFIXAggregationProcess)(nil).DeleteFlowKeyFromMapWithoutLock), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetExpiryFromExpirePriorityQueue", reflect.TypeOf((*MockIPFIXAggregationProcess)(nil).GetExpiryFromExpirePriorityQueue)) } -// ForAllRecordsDo mocks base method -func (m *MockIPFIXAggregationProcess) ForAllRecordsDo(arg0 intermediate.FlowKeyRecordMapCallBack) error { +// ResetStatElementsInRecord mocks base method +func (m *MockIPFIXAggregationProcess) ResetStatElementsInRecord(arg0 entities.Record) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ForAllRecordsDo", arg0) + ret := m.ctrl.Call(m, "ResetStatElementsInRecord", arg0) ret0, _ := ret[0].(error) return ret0 } -// ForAllRecordsDo indicates an expected call of ForAllRecordsDo -func (mr *MockIPFIXAggregationProcessMockRecorder) ForAllRecordsDo(arg0 interface{}) *gomock.Call { +// ResetStatElementsInRecord indicates an expected call of ResetStatElementsInRecord +func (mr *MockIPFIXAggregationProcessMockRecorder) ResetStatElementsInRecord(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForAllRecordsDo", reflect.TypeOf((*MockIPFIXAggregationProcess)(nil).ForAllRecordsDo), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetStatElementsInRecord", reflect.TypeOf((*MockIPFIXAggregationProcess)(nil).ResetStatElementsInRecord), arg0) } // Start mocks base method diff --git a/plugins/octant/go.sum b/plugins/octant/go.sum index fbd49b42666..d0bf418828b 100644 --- a/plugins/octant/go.sum +++ b/plugins/octant/go.sum @@ -74,6 +74,8 @@ github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/Shopify/sarama v1.27.2/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/TomCodeLV/OVSDB-golang-lib v0.0.0-20200116135253-9bbdfadcd881/go.mod h1:J623KtHQCavhT3jhFh0wg5i6QQRdnsAxAlBrOY0TUMw= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= @@ -180,6 +182,9 @@ github.com/dop251/goja_nodejs v0.0.0-20200706082813-b2775b86b9e0 h1:pv6LV8EpCUid github.com/dop251/goja_nodejs v0.0.0-20200706082813-b2775b86b9e0/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/elazarl/goproxy v0.0.0-20190703090003-6125c262ffb0/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1 h1:yY9rWGoXv1U5pl4gxqlULARMQD7x0QG85lqEXTWysik= @@ -201,6 +206,8 @@ github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwo github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -310,8 +317,10 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golangplus/testing v0.0.0-20180327235837-af21d9c3145e/go.mod h1:0AA//k/eakGydO4jKRoRL2j92ZKSzTgj9tclaCrvXHk= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= @@ -323,8 +332,9 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= @@ -385,6 +395,7 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -408,6 +419,7 @@ github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA= github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw= @@ -426,12 +438,14 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -522,6 +536,7 @@ github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIG github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pion/dtls/v2 v2.0.3/go.mod h1:TUjyL8bf8LH95h81Xj7kATmzMRt29F/4lxpIPj2Xe4Y= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/transport v0.10.0/go.mod h1:BnHnUipd0rZQyTVB2SBGojFHT9CBt5C5TcsJSQGkvSE= @@ -556,6 +571,7 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rakelkar/gonetsh v0.0.0-20210226024844-dfffed138500/go.mod h1:MkEXf5yV9DRTy8TfpWdvMuCTkxajNE/0tn9pvZ6ikDw= +github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -637,8 +653,10 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vmware-tanzu/octant v0.17.0 h1:2H2AiQU5C1RiHxxYrrOosDHHI9eV51nP+e9PjRP+c48= github.com/vmware-tanzu/octant v0.17.0/go.mod h1:lA32xKa6icUclg+DjAX/E/Id1cTqwCXZUem3RGEp/2A= -github.com/vmware/go-ipfix v0.4.8/go.mod h1:JvYQ70hgDDdMVjz1kz+Kb0n5u0RD65TnXcl6MFkjpq0= +github.com/vmware/go-ipfix v0.5.1/go.mod h1:P0+G5MH48fwF66O+D/C28lxCX2qqpq6Enyb3ju2aqvY= github.com/wenyingd/ofnet v0.0.0-20201109024835-6fd225d8c8d1/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= @@ -767,6 +785,7 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201024042810-be3efd7ff127/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -1011,8 +1030,10 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= -google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -1030,6 +1051,11 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= @@ -1086,7 +1112,6 @@ k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.3.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= -k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts= k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= k8s.io/kube-aggregator v0.19.3/go.mod h1:5KTkDBxx4YiAYUuqTGmwjH7v54hRdrykqzcPiF8hPJc= diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index fd1eb05cdb1..5b6f2d8fcc6 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -247,10 +247,10 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri // Check if record has both Pod name of source and destination pod. if isIntraNode { checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName()) - checkFlowType(t, record, ipfixregistry.IntraNode) + checkFlowType(t, record, ipfixregistry.FlowTypeIntraNode) } else { checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1)) - checkFlowType(t, record, ipfixregistry.InterNode) + checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) } if checkService { diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 220b6f94d1c..37e82fa62db 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -609,7 +609,11 @@ func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollector string) error } flowAggregatorConf, _ := configMap.Data[flowAggregatorConfName] flowAggregatorConf = strings.Replace(flowAggregatorConf, "#externalFlowCollectorAddr: \"\"", fmt.Sprintf("externalFlowCollectorAddr: \"%s\"", ipfixCollector), 1) - flowAggregatorConf = strings.Replace(flowAggregatorConf, "#flowExportInterval: 60s", "flowExportInterval: 5s", 1) + // We expect at least two flow records at the external flow collector, so picked + // 4s and 6s for active and inactive flow timeouts, respectively, considering + // the fact that test flows run for 10s. + flowAggregatorConf = strings.Replace(flowAggregatorConf, "#activeFlowRecordTimeout: 60s", "activeFlowRecordTimeout: 4s", 1) + flowAggregatorConf = strings.Replace(flowAggregatorConf, "#inactiveFlowRecordTimeout: 90s", "inactiveFlowRecordTimeout: 6s", 1) if testOptions.providerName == "kind" { // In Kind cluster, there are issues with DNS name resolution on worker nodes. // We will skip TLS testing for Kind cluster because the server certificate is generated with Flow aggregator's DNS name