Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flakiness in Kind e2e flow aggregator tests #2308

Merged
merged 1 commit into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pkg/agent/flowexporter/connections/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.Node

// Consider Pod-to-Pod, Pod-To-Service and Pod-To-External flows.
if srcIP.Equal(nodeConfig.GatewayConfig.IPv4) || dstIP.Equal(nodeConfig.GatewayConfig.IPv4) {
klog.V(4).Infof("Detected flow for which one of the endpoint is host gateway %s :%+v", nodeConfig.GatewayConfig.IPv4.String(), conn)
continue
}
if srcIP.Equal(nodeConfig.GatewayConfig.IPv6) || dstIP.Equal(nodeConfig.GatewayConfig.IPv6) {
klog.V(4).Infof("Detected flow for which one of the endpoint is host gateway %s :%+v", nodeConfig.GatewayConfig.IPv6.String(), conn)
continue
antoninbas marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
}
}
cs.addNetworkPolicyMetadata(conn)

if conn.StartTime.IsZero() {
conn.StartTime = time.Now()
}
Comment on lines +229 to +231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel that we can include fix of this #1417 in this PR too. It is about start time too but the default value is different (flowStartSeconds: 2288912640 or StartTime: Jul 14, 2042 03:04:00.000000000 CEST)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
// To test service name mapping.
tuple4 := flowexporter.Tuple{SourceAddress: net.IP{10, 10, 10, 10}, DestinationAddress: net.IP{20, 20, 20, 20}, Protocol: 6, SourcePort: 5000, DestinationPort: 80}
testFlow4 := flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime,
FlowKey: tuple4,
Mark: openflow.ServiceCTMark,
IsPresent: true,
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func (exp *flowExporter) sendFlowRecords() error {
} else {
exp.flowRecords.ValidateAndUpdateStats(key, record)
}
klog.V(4).InfoS("Record sent successfully", "flowKey", key, "record", record)
}
return nil
}
Expand All @@ -330,6 +331,7 @@ func (exp *flowExporter) sendFlowRecords() error {
return err
}
exp.numDataSetsSent = exp.numDataSetsSent + 1
klog.V(4).InfoS("Record for deny connection sent successfully", "flowKey", connKey, "connection", conn)
exp.denyConnStore.ResetConnStatsWithoutLock(connKey)
}
if time.Since(conn.LastExportTime) >= exp.idleFlowTimeout {
Expand All @@ -340,6 +342,7 @@ func (exp *flowExporter) sendFlowRecords() error {
return err
}
exp.numDataSetsSent = exp.numDataSetsSent + 1
klog.V(4).InfoS("Record for deny connection sent successfully", "flowKey", connKey, "connection", conn)
exp.denyConnStore.DeleteConnWithoutLock(connKey)
}
return nil
Expand Down
84 changes: 61 additions & 23 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ const (
testIngressRuleName = "test-ingress-rule-name"
testEgressRuleName = "test-egress-rule-name"
iperfTimeSec = 12
)

var (
// Single iperf run results in two connections with separate ports (control connection and actual data connection).
// As 2s is the export active timeout of flow exporter and iperf traffic runs for 12s, we expect totally 12 records
// exporting to the flow aggregator at time 2s, 4s, 6s, 8s, 10s, and 12s after iperf traffic begins.
Expand All @@ -144,6 +147,21 @@ func TestFlowAggregator(t *testing.T) {
defer teardownTest(t, data)
defer teardownFlowAggregator(t, data)

if testOptions.providerName == "kind" {
// Currently, in Kind clusters, OVS userspace datapath does not support
// packet statistics in the conntrack entries. Because of that Flow Exporter
// at Antrea agent cannot consider flows to be active and keep sending active
antoninbas marked this conversation as resolved.
Show resolved Hide resolved
// records. Currently, Flow Exporter sends two records for a iperf flow
// in kind cluster with a duration of 12s: 1. A new iperf connection gets
// idled out after exporter idle timeout, which is after 1s in the test.
// In this case, flow aggregator sends the record after 4.5s 2. When the
// connection dies and TCP state becomes TIME_WAIT, which is
// at 12s in the test. Here, Flow Aggregator sends the record at 15.5s.
// We will remove this workaround once OVS userspace datapath supports packet
// statistics in conntrack entries.
expectedNumDataRecords = 2
}

k8sUtils, err = NewKubernetesUtils(data)
if err != nil {
t.Fatalf("Error when creating Kubernetes utils client: %v", err)
Expand Down Expand Up @@ -487,16 +505,15 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
timeStartSec := timeStart.Unix()
var cmdStr string
if !isIPv6 {
cmdStr = fmt.Sprintf("iperf3 -c %s -t %d|grep sender|awk '{print $7,$8}'", dstIP, iperfTimeSec)
cmdStr = fmt.Sprintf("iperf3 -c %s -t %d", dstIP, iperfTimeSec)
} else {
cmdStr = fmt.Sprintf("iperf3 -6 -c %s -t %d|grep sender|awk '{print $7,$8}'", dstIP, iperfTimeSec)
cmdStr = fmt.Sprintf("iperf3 -6 -c %s -t %d", dstIP, iperfTimeSec)
}
stdout, _, err := data.runCommandFromPod(testNamespace, "perftest-a", "perftool", []string{"bash", "-c", cmdStr})
if err != nil {
t.Errorf("Error when running iperf3 client: %v", err)
}
bandwidth := strings.TrimSpace(stdout)
bwSlice := strings.Split(bandwidth, " ")
bwSlice, srcPort := getBandwidthAndSourcePort(stdout)
// bandwidth from iperf output
bandwidthInFloat, err := strconv.ParseFloat(bwSlice[0], 64)
require.NoErrorf(t, err, "Error when converting iperf bandwidth %s to float64 type", bwSlice[0])
Expand All @@ -509,13 +526,15 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
t.Fatalf("Unit of the traffic bandwidth reported by iperf should either be Mbits or Gbits, failing the test.")
}

collectorOutput := getCollectorOutput(t, srcIP, dstIP, timeStart, true)
collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, timeStart, true)
// Iterate over recordSlices and build some results to test with expected results
recordSlices := getRecordsFromOutput(collectorOutput)
dataRecordsCount := 0
var octetTotalCount uint64
for _, record := range recordSlices {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
// Check the source port along with source and destination IPs as there
// are flow records for control flows during the iperf with same IPs
// and destination port.
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) && strings.Contains(record, srcPort) {
dataRecordsCount = dataRecordsCount + 1
// Check if record has both Pod name of source and destination Pod.
if isIntraNode {
Expand Down Expand Up @@ -581,7 +600,7 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
}
// Checking only data records as data records cannot be decoded without template
// record.
assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: ", len(recordSlices))
assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput)
}

func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool) {
Expand All @@ -595,8 +614,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st
stdout, stderr, err := data.runCommandFromPod(testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd))
require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr)

collectorOutput := getCollectorOutput(t, srcIP, dstIP, timeStart, false)
recordSlices := getRecordsFromOutput(collectorOutput)
_, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", timeStart, false)
for _, record := range recordSlices {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "")
Expand Down Expand Up @@ -625,9 +643,8 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2
_, _, err = data.runCommandFromPod(testNamespace, testFlow2.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr2})
assert.Error(t, err)

collectorOutput := getCollectorOutput(t, testFlow1.srcIP, testFlow2.srcIP, timeStart, false)
_, recordSlices := getCollectorOutput(t, testFlow1.srcIP, testFlow2.srcIP, "", timeStart, false)
// Iterate over recordSlices and build some results to test with expected results
recordSlices := getRecordsFromOutput(collectorOutput)
for _, record := range recordSlices {
var srcPodName, dstPodName string
if strings.Contains(record, testFlow1.srcIP) && strings.Contains(record, testFlow1.dstIP) {
Expand Down Expand Up @@ -730,45 +747,46 @@ func getUnit64FieldFromRecord(t *testing.T, record string, field string) uint64
return 0
}

func getCollectorOutput(t *testing.T, srcIP string, dstIP string, timeStart time.Time, checkAllRecords bool) string {
// getCollectorOutput polls the output of go-ipfix collector and checks if we have
// received all the expected records for a given flow with source IP, destination IP
// and source port. We send source port to ignore the control flows during the
// iperf test.
func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, timeStart time.Time, checkAllRecords bool) (string, []string) {
var collectorOutput string
var recordSlices []string
err := wait.PollImmediate(500*time.Millisecond, aggregatorInactiveFlowRecordTimeout, func() (bool, error) {
var rc int
var err error
// `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed
rc, collectorOutput, _, err = provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --since=%v --pod-running-timeout=%v ipfix-collector -n antrea-test", time.Since(timeStart).String(), aggregatorInactiveFlowRecordTimeout.String()))
rc, collectorOutput, _, err = provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n antrea-test", aggregatorInactiveFlowRecordTimeout.String()))
if err != nil || rc != 0 {
return false, err
}
// Checking that all the data records which correspond to the iperf flow are received
recordSlices = getRecordsFromOutput(collectorOutput)
if checkAllRecords {
recordSlices := getRecordsFromOutput(collectorOutput)
for _, record := range recordSlices {
exportTime := int64(getUnit64FieldFromRecord(t, record, "flowEndSeconds"))
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) && strings.Contains(record, srcPort) {
if exportTime >= timeStart.Unix()+iperfTimeSec {
return true, nil
}
}
}
return false, nil
} else {
return strings.Contains(collectorOutput, srcIP) && strings.Contains(collectorOutput, dstIP), nil
return strings.Contains(collectorOutput, srcIP) && strings.Contains(collectorOutput, dstIP) && strings.Contains(collectorOutput, srcPort), nil
}
})
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records and timed out with error")
return collectorOutput
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector output: %v time start: %s iperf source port: %s", collectorOutput, timeStart.String(), srcPort)
return collectorOutput, recordSlices
}

func getRecordsFromOutput(output string) []string {
re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+")
output = re.ReplaceAllString(output, "")
output = strings.TrimSpace(output)
recordSlices := strings.Split(output, "IPFIX-HDR:")
// Delete the first element from recordSlices
recordSlices[0] = recordSlices[len(recordSlices)-1]
recordSlices[len(recordSlices)-1] = ""
recordSlices = recordSlices[:len(recordSlices)-1]
return recordSlices
}

Expand Down Expand Up @@ -1001,3 +1019,23 @@ func deletePerftestServices(t *testing.T, data *TestData) {
}
}
}

// getBandwidthAndSourcePort parses iperf commands output and returns bandwidth
// and source port. Bandwidth is returned as a slice containing two strings (bandwidth
// value and bandwidth unit).
func getBandwidthAndSourcePort(iperfStdout string) ([]string, string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can add a sample output here for elaboration. Also if the iperf doesn't send the traffic correctly, will the result be the same? (srcPort return empty string etc.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check for the error after executing iperf command before parsing the output on stdout. It will take care of the mentioned scenario.

var bandwidth []string
var srcPort string
outputLines := strings.Split(iperfStdout, "\n")
for _, line := range outputLines {
if strings.Contains(line, "sender") {
fields := strings.Fields(line)
bandwidth = fields[6:8]
}
if strings.Contains(line, "connected") {
fields := strings.Fields(line)
srcPort = fields[5]
}
}
return bandwidth, srcPort
}
4 changes: 2 additions & 2 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ const (
nginxLBService = "nginx-loadbalancer"

exporterActiveFlowExportTimeout = 2 * time.Second
exporterInactiveFlowExportTimeout = 1 * time.Second
exporterIdleFlowExportTimeout = 1 * time.Second
aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond
aggregatorInactiveFlowRecordTimeout = 6 * time.Second
)
Expand Down Expand Up @@ -571,7 +571,7 @@ func (data *TestData) deployAntreaFlowExporter(ipfixCollector string) error {
{"FlowExporter", "true", true},
{"flowPollInterval", "\"1s\"", false},
{"activeFlowExportTimeout", fmt.Sprintf("\"%v\"", exporterActiveFlowExportTimeout), false},
{"inactiveFlowExportTimeout", fmt.Sprintf("\"%v\"", exporterInactiveFlowExportTimeout), false},
{"idleFlowExportTimeout", fmt.Sprintf("\"%v\"", exporterIdleFlowExportTimeout), false},
}
if ipfixCollector != "" {
ac = append(ac, configChange{"flowCollectorAddr", fmt.Sprintf("\"%s\"", ipfixCollector), false})
Expand Down