diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index e10e9f09320..c4ec4e3472f 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -458,6 +458,10 @@ func (fa *flowAggregator) sendTemplateSet(templateSet ipfixentities.Set, isIPv6 ie := ipfixentities.NewInfoElementWithValue(element, nil) elements = append(elements, ie) } + fa.set.ResetSet() + if err := fa.set.PrepareSet(ipfixentities.Template, templateID); err != nil { + return 0, err + } err := templateSet.AddRecord(elements, templateID) if err != nil { return 0, fmt.Errorf("error when adding record to set, error: %v", err) diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 5b6f2d8fcc6..1327a9cd335 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -97,7 +97,6 @@ AntreaProxy enabled (Inter-Node): Flow record from destination Node is ignored, const ( ingressNetworkPolicyName = "test-flow-aggregator-networkpolicy-ingress" egressNetworkPolicyName = "test-flow-aggregator-networkpolicy-egress" - collectorCheckTimeout = 10 * time.Second // Single iperf run results in two connections with separate ports (control connection and actual data connection). // As 5s is export interval and iperf traffic runs for 10s, we expect about 4 records exporting to the flow aggregator. // Since flow aggregator will aggregate records based on 5-tuple connection key, we expect 2 records. @@ -221,8 +220,8 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri // Polling to make sure all the data records corresponding to the iperf flow // are received. - err = wait.Poll(250*time.Millisecond, collectorCheckTimeout, func() (bool, error) { - rc, collectorOutput, _, err := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --since=%v ipfix-collector -n antrea-test", time.Since(timeStart).String())) + err = wait.PollImmediate(500*time.Millisecond, flowAggregatorExportInterval, func() (bool, error) { + rc, collectorOutput, _, err := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --since=%v --pod-running-timeout=%v ipfix-collector -n antrea-test", time.Since(timeStart).String(), flowAggregatorExportInterval.String())) if err != nil || rc != 0 { return false, err } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 37e82fa62db..cda5239e85c 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -56,8 +56,9 @@ import ( ) const ( - defaultTimeout = 90 * time.Second - defaultInterval = 1 * time.Second + defaultTimeout = 90 * time.Second + defaultInterval = 1 * time.Second + flowAggregatorExportInterval = 5 * time.Second // antreaNamespace is the K8s Namespace in which all Antrea resources are running. antreaNamespace string = "kube-system"