From bf784275421afc1f61af838fefc05b8fa2ccc00d Mon Sep 17 00:00:00 2001 From: Srikar Tati Date: Mon, 15 Mar 2021 21:17:15 -0700 Subject: [PATCH] Fix e2e flow aggregator test error CI e2e tests are failing during or after TestFlowAggregator Errors are happening when flow aggregator service DNS resolution is not successful. I was able to get the backtrace that leads to crashing the agent. Added fix when checking the interface for nil to resolve it. --- pkg/agent/flowexporter/exporter/exporter.go | 62 +++++++++++---------- test/e2e/fixtures.go | 29 ++++++---- test/e2e/flowaggregator_test.go | 2 +- test/e2e/framework.go | 3 +- 4 files changed, 54 insertions(+), 42 deletions(-) diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 231a2dae063..0bdeb1d453b 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -18,6 +18,7 @@ import ( "fmt" "hash/fnv" "net" + "reflect" "time" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" @@ -95,6 +96,8 @@ type flowExporter struct { exporterInput exporter.ExporterInput activeFlowTimeout time.Duration idleFlowTimeout time.Duration + enableTLSToFlowAggregator bool + k8sClient kubernetes.Interface } func genObservationID() (uint32, error) { @@ -115,33 +118,6 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string, enableTLSToF if err != nil { return expInput, err } - - if enableTLSToFlowAggregator { - // if CA certificate, client certificate and key do not exist during initialization, - // it will retry to obtain the credentials in next export cycle - expInput.CACert, err = getCACert(k8sClient) - if err != nil { - return expInput, fmt.Errorf("cannot retrieve CA cert: %v", err) - } - expInput.ClientCert, expInput.ClientKey, err = getClientCertKey(k8sClient) - if err != nil { - return expInput, fmt.Errorf("cannot retrieve client cert and key: %v", err) - } - // TLS transport does not need any tempRefTimeout as it is applicable only - // for TCP transport, so sending 0. - expInput.TempRefTimeout = 0 - expInput.IsEncrypted = true - } else if collectorProto == "tcp" { - // TCP transport does not need any tempRefTimeout, so sending 0. - // tempRefTimeout is the template refresh timeout, which specifies how often - // the exporting process should send the template again. - expInput.TempRefTimeout = 0 - expInput.IsEncrypted = false - } else { - // For UDP transport, hardcoding tempRefTimeout value as 1800s. - expInput.TempRefTimeout = 1800 - expInput.IsEncrypted = false - } expInput.CollectorAddress = collectorAddr expInput.CollectorProtocol = collectorProto expInput.PathMTU = 0 @@ -171,6 +147,8 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords activeFlowTimeout: activeFlowTimeout, idleFlowTimeout: idleFlowTimeout, ipfixSet: ipfix.NewSet(false), + enableTLSToFlowAggregator: enableTLSToFlowAggregator, + k8sClient: k8sClient, }, nil } @@ -191,7 +169,10 @@ func (exp *flowExporter) Export() { // There could be other errors while initializing flow exporter other than connecting to IPFIX collector, // therefore closing the connection and resetting the process. if exp.process != nil { - exp.process.CloseConnToCollector() + if !(reflect.ValueOf(exp.process).Kind() == reflect.Ptr && reflect.ValueOf(exp.process).IsNil()) { + klog.Errorf("process is %v", exp.process) + } + // exp.process.CloseConnToCollector() exp.process = nil } return @@ -212,6 +193,31 @@ func (exp *flowExporter) Export() { func (exp *flowExporter) initFlowExporter() error { var err error + if exp.enableTLSToFlowAggregator { + // if CA certificate, client certificate and key do not exist during initialization, + // it will retry to obtain the credentials in next export cycle + exp.exporterInput.CACert, err = getCACert(exp.k8sClient) + if err != nil { + return fmt.Errorf("cannot retrieve CA cert: %v", err) + } + exp.exporterInput.ClientCert, exp.exporterInput.ClientKey, err = getClientCertKey(exp.k8sClient) + if err != nil { + return fmt.Errorf("cannot retrieve client cert and key: %v", err) + } + // TLS transport does not need any tempRefTimeout, so sending 0. + exp.exporterInput.TempRefTimeout = 0 + exp.exporterInput.IsEncrypted = true + } else if exp.exporterInput.CollectorProtocol == "tcp" { + // TCP transport does not need any tempRefTimeout, so sending 0. + // tempRefTimeout is the template refresh timeout, which specifies how often + // the exporting process should send the template again. + exp.exporterInput.TempRefTimeout = 0 + exp.exporterInput.IsEncrypted = false + } else { + // For UDP transport, hardcoding tempRefTimeout value as 1800s. + exp.exporterInput.TempRefTimeout = 1800 + exp.exporterInput.IsEncrypted = false + } exp.process, err = ipfix.NewIPFIXExportingProcess(exp.exporterInput) if err != nil { return fmt.Errorf("error when starting exporter: %v", err) diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index adc251b988e..772b5c99115 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -143,42 +143,47 @@ func setupTest(tb testing.TB) (*TestData, error) { func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) { // TODO: remove hardcoding to IPv4 after flow aggregator supports IPv6 isIPv6 := false - if _, err := setupTest(tb); err != nil { + data, err := setupTest(tb) + if err != nil { return nil, err, isIPv6 } // Create pod using ipfix collector image - if err := testData.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil { + if err = data.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil { tb.Errorf("Error when creating the ipfix collector Pod: %v", err) } - ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace) + ipfixCollectorIP, err := data.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace) if err != nil || len(ipfixCollectorIP.ipStrings) == 0 { tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err) return nil, err, isIPv6 } ipStr := ipfixCollectorIP.ipv4.String() ipfixCollectorAddr := fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort) + + faClusterIPAddr := "" tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr) - faClusterIP, err := testData.deployFlowAggregator(ipfixCollectorAddr) + faClusterIP, err := data.deployFlowAggregator(ipfixCollectorAddr) if err != nil { return testData, err, isIPv6 } - - faClusterIPAddr := "" if testOptions.providerName == "kind" { // In Kind cluster, there are issues with DNS name resolution on worker nodes. // Please note that CoreDNS services are forced on to control-plane Node. faClusterIPAddr = fmt.Sprintf("%s:%s:tcp", faClusterIP, ipfixCollectorPort) + tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr) + if err = data.deployAntreaFlowExporter(faClusterIPAddr); err != nil { + return data, err, isIPv6 + } } tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr) - if err = testData.deployAntreaFlowExporter(faClusterIPAddr); err != nil { - return testData, err, isIPv6 + if err = data.deployAntreaFlowExporter(faClusterIPAddr); err != nil { + return data, err, isIPv6 } tb.Logf("Checking CoreDNS deployment") - if err = testData.checkCoreDNSPods(defaultTimeout); err != nil { - return testData, err, isIPv6 + if err = data.checkCoreDNSPods(defaultTimeout); err != nil { + return data, err, isIPv6 } - return testData, nil, isIPv6 + return data, nil, isIPv6 } func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) { @@ -304,7 +309,7 @@ func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs } func teardownFlowAggregator(tb testing.TB, data *TestData) { - if err := testData.gracefulExitFlowAggregator(testOptions.coverageDir); err != nil { + if err := data.gracefulExitFlowAggregator(testOptions.coverageDir); err != nil { tb.Fatalf("Error when gracefully exiting Flow Aggregator: %v", err) } tb.Logf("Deleting '%s' K8s Namespace", flowAggregatorNamespace) diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index dcce2f0b292..99970cc9d69 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -109,8 +109,8 @@ func TestFlowAggregator(t *testing.T) { if err != nil { t.Fatalf("Error when setting up test: %v", err) } - defer teardownFlowAggregator(t, data) defer teardownTest(t, data) + defer teardownFlowAggregator(t, data) podAIP, podBIP, podCIP, svcB, svcC, err := createPerftestPods(data) if err != nil { diff --git a/test/e2e/framework.go b/test/e2e/framework.go index a2cb651c22d..69d3b359592 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -681,7 +681,8 @@ func (data *TestData) waitForAntreaDaemonSetPods(timeout time.Duration) error { return true, nil }) if err == wait.ErrWaitTimeout { - return fmt.Errorf("antrea-agent DaemonSet not ready within %v", defaultTimeout) + _, stdout, _, _ := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s describe pod", antreaNamespace)) + return fmt.Errorf("antrea-agent DaemonSet not ready within %v; kubectl describe pod output: %v", defaultTimeout, stdout) } else if err != nil { return err }