Skip to content

Commit

Permalink
Fix flakiness in Kind e2e flow aggregator tests
Browse files Browse the repository at this point in the history
The expected number of records is not correct in Kind clusters
as packet counters are not supported in conntrack entries for
OVS userspace datapath. Flow exporter cannot send records after
active timeout expiry as we cannot consider flows to be active
without the statistics. This was missed when bandwidth tests are
modified for Kind clusters in PR #1802.

Therefore, we expect only 2 records for any given flow in Kind
clusters. We expect this limitation to be resolved soon when OVS
userspace datapath support statistics in conntrack entries.
In e2e test, we expect 2 flow records for Kind clusters and 3 flow
records in other clusters.

In addition, we consider source port from iperf flows to ignore other
similar flow records from the iperf control flows. Because of this we
were passing the tests earlier in Kind clusters.

Signed-off-by: Srikar Tati <stati@vmware.com>
  • Loading branch information
srikartati committed Jun 24, 2021
1 parent ef14ad1 commit 7bcb10f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 15 deletions.
2 changes: 0 additions & 2 deletions pkg/agent/flowexporter/connections/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.Node

// Consider Pod-to-Pod, Pod-To-Service and Pod-To-External flows.
if srcIP.Equal(nodeConfig.GatewayConfig.IPv4) || dstIP.Equal(nodeConfig.GatewayConfig.IPv4) {
klog.V(4).Infof("Detected flow for which one of the endpoint is host gateway %s :%+v", nodeConfig.GatewayConfig.IPv4.String(), conn)
continue
}
if srcIP.Equal(nodeConfig.GatewayConfig.IPv6) || dstIP.Equal(nodeConfig.GatewayConfig.IPv6) {
klog.V(4).Infof("Detected flow for which one of the endpoint is host gateway %s :%+v", nodeConfig.GatewayConfig.IPv6.String(), conn)
continue
}

Expand Down
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ func (exp *flowExporter) sendFlowRecords() error {
} else {
exp.flowRecords.ValidateAndUpdateStats(key, record)
}
klog.V(4).InfoS("Record sent successfully", "flowKey", key, "record", record)
}
return nil
}
Expand Down
65 changes: 52 additions & 13 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ const (
testIngressRuleName = "test-ingress-rule-name"
testEgressRuleName = "test-egress-rule-name"
iperfTimeSec = 12
)

var (
// Single iperf run results in two connections with separate ports (control connection and actual data connection).
// As 2s is the export active timeout of flow exporter and iperf traffic runs for 12s, we expect totally 12 records
// exporting to the flow aggregator at time 2s, 4s, 6s, 8s, 10s, and 12s after iperf traffic begins.
Expand All @@ -142,6 +145,17 @@ func TestFlowAggregator(t *testing.T) {
defer teardownTest(t, data)
defer teardownFlowAggregator(t, data)

if testOptions.providerName == "kind" {
// Currently, in Kind clusters, OVS userspace datapath does not support
// packet statistics in the conntrack entries. Because of that Flow Exporter
// at Antrea agent cannot consider flows to be active and keep sending active
// records. Flow exporter sends two records for a flow: 1. New connection
// added to the exporter 2. When the connection dies and TCP state becomes
// TIME_WAIT. We will remove this workaround once OVS userspace datapath
// supports packet statistics in conntrack entries.
expectedNumDataRecords = 2
}

k8sUtils, err = NewKubernetesUtils(data)
if err != nil {
t.Fatalf("Error when creating Kubernetes utils client: %v", err)
Expand Down Expand Up @@ -485,16 +499,15 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
timeStartSec := timeStart.Unix()
var cmdStr string
if !isIPv6 {
cmdStr = fmt.Sprintf("iperf3 -c %s -t %d|grep sender|awk '{print $7,$8}'", dstIP, iperfTimeSec)
cmdStr = fmt.Sprintf("iperf3 -c %s -t %d", dstIP, iperfTimeSec)
} else {
cmdStr = fmt.Sprintf("iperf3 -6 -c %s -t %d|grep sender|awk '{print $7,$8}'", dstIP, iperfTimeSec)
cmdStr = fmt.Sprintf("iperf3 -6 -c %s -t %d", dstIP, iperfTimeSec)
}
stdout, _, err := data.runCommandFromPod(testNamespace, "perftest-a", "perftool", []string{"bash", "-c", cmdStr})
if err != nil {
t.Errorf("Error when running iperf3 client: %v", err)
}
bandwidth := strings.TrimSpace(stdout)
bwSlice := strings.Split(bandwidth, " ")
bwSlice, srcPort := getBandwidthAndSourcePort(stdout)
// bandwidth from iperf output
bandwidthInFloat, err := strconv.ParseFloat(bwSlice[0], 64)
require.NoErrorf(t, err, "Error when converting iperf bandwidth %s to float64 type", bwSlice[0])
Expand All @@ -507,13 +520,16 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
t.Fatalf("Unit of the traffic bandwidth reported by iperf should either be Mbits or Gbits, failing the test.")
}

collectorOutput := getCollectorOutput(t, srcIP, dstIP, timeStart, true)
collectorOutput := getCollectorOutput(t, srcIP, dstIP, srcPort, timeStart, true)
// Iterate over recordSlices and build some results to test with expected results
recordSlices := getRecordsFromOutput(collectorOutput)
dataRecordsCount := 0
var octetTotalCount uint64
for _, record := range recordSlices {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
// Check the source port along with source and destination IPs as there
// are flow records for control flows during iperf traffics with same IPs
// and destination port.
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) && strings.Contains(record, srcPort) {
dataRecordsCount = dataRecordsCount + 1
// Check if record has both Pod name of source and destination Pod.
if isIntraNode {
Expand Down Expand Up @@ -579,7 +595,7 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
}
// Checking only data records as data records cannot be decoded without template
// record.
assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: ", len(recordSlices))
assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %v", recordSlices)
}

func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool) {
Expand All @@ -593,7 +609,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st
stdout, stderr, err := data.runCommandFromPod(testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd))
require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr)

collectorOutput := getCollectorOutput(t, srcIP, dstIP, timeStart, false)
collectorOutput := getCollectorOutput(t, srcIP, dstIP, "", timeStart, false)
recordSlices := getRecordsFromOutput(collectorOutput)
for _, record := range recordSlices {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
Expand Down Expand Up @@ -623,7 +639,7 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2
_, _, err = data.runCommandFromPod(testNamespace, testFlow2.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr2})
assert.Error(t, err)

collectorOutput := getCollectorOutput(t, testFlow1.srcIP, testFlow2.srcIP, timeStart, false)
collectorOutput := getCollectorOutput(t, testFlow1.srcIP, testFlow2.srcIP, "", timeStart, false)
// Iterate over recordSlices and build some results to test with expected results
recordSlices := getRecordsFromOutput(collectorOutput)
for _, record := range recordSlices {
Expand Down Expand Up @@ -723,7 +739,11 @@ func getUnit64FieldFromRecord(t *testing.T, record string, field string) uint64
return 0
}

func getCollectorOutput(t *testing.T, srcIP string, dstIP string, timeStart time.Time, checkAllRecords bool) string {
// getCollectorOutput polls the output of go-ipfix collector and checks if we have
// received all the expected records for a given flow with source IP, destination IP
// and source port. We send source port to ignore the control flows during the
// iperf test.
func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, timeStart time.Time, checkAllRecords bool) string {
var collectorOutput string
err := wait.PollImmediate(500*time.Millisecond, aggregatorInactiveFlowRecordTimeout, func() (bool, error) {
var rc int
Expand All @@ -738,18 +758,18 @@ func getCollectorOutput(t *testing.T, srcIP string, dstIP string, timeStart time
recordSlices := getRecordsFromOutput(collectorOutput)
for _, record := range recordSlices {
exportTime := int64(getUnit64FieldFromRecord(t, record, "flowEndSeconds"))
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) && strings.Contains(record, srcPort) {
if exportTime >= timeStart.Unix()+iperfTimeSec {
return true, nil
}
}
}
return false, nil
} else {
return strings.Contains(collectorOutput, srcIP) && strings.Contains(collectorOutput, dstIP), nil
return strings.Contains(collectorOutput, srcIP) && strings.Contains(collectorOutput, dstIP) && strings.Contains(collectorOutput, srcPort), nil
}
})
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records and timed out with error")
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector output: %v", collectorOutput)
return collectorOutput
}

Expand Down Expand Up @@ -994,3 +1014,22 @@ func deletePerftestServices(t *testing.T, data *TestData) {
}
}
}

// getBandwidthAndSourcePort parses iperf commands output and returns bandwidth
// and source port.
func getBandwidthAndSourcePort(iperfStdout string) ([]string, string) {
var bandwidth []string
var srcPort string
outputLines := strings.Split(iperfStdout, "\n")
for _, line := range outputLines {
if strings.Contains(line, "sender") {
fields := strings.Fields(line)
bandwidth = fields[6:8]
}
if strings.Contains(line, "connected") {
fields := strings.Fields(line)
srcPort = fields[5]
}
}
return bandwidth, srcPort
}

0 comments on commit 7bcb10f

Please sign in to comment.