Skip to content

Commit

Permalink
Fix e2e flow aggregator test error
Browse files Browse the repository at this point in the history
CI e2e tests are failing during or after TestFlowAggregator
Errors are happening when flow aggregator service DNS resolution is
not successful. I was able to get the backtrace that leads to crashing
the agent.

Added fix when checking the interface for nil to resolve it.
  • Loading branch information
srikartati committed Mar 17, 2021
1 parent 020946f commit bf78427
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 42 deletions.
62 changes: 34 additions & 28 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"hash/fnv"
"net"
"reflect"
"time"

ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
Expand Down Expand Up @@ -95,6 +96,8 @@ type flowExporter struct {
exporterInput exporter.ExporterInput
activeFlowTimeout time.Duration
idleFlowTimeout time.Duration
enableTLSToFlowAggregator bool
k8sClient kubernetes.Interface
}

func genObservationID() (uint32, error) {
Expand All @@ -115,33 +118,6 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string, enableTLSToF
if err != nil {
return expInput, err
}

if enableTLSToFlowAggregator {
// if CA certificate, client certificate and key do not exist during initialization,
// it will retry to obtain the credentials in next export cycle
expInput.CACert, err = getCACert(k8sClient)
if err != nil {
return expInput, fmt.Errorf("cannot retrieve CA cert: %v", err)
}
expInput.ClientCert, expInput.ClientKey, err = getClientCertKey(k8sClient)
if err != nil {
return expInput, fmt.Errorf("cannot retrieve client cert and key: %v", err)
}
// TLS transport does not need any tempRefTimeout as it is applicable only
// for TCP transport, so sending 0.
expInput.TempRefTimeout = 0
expInput.IsEncrypted = true
} else if collectorProto == "tcp" {
// TCP transport does not need any tempRefTimeout, so sending 0.
// tempRefTimeout is the template refresh timeout, which specifies how often
// the exporting process should send the template again.
expInput.TempRefTimeout = 0
expInput.IsEncrypted = false
} else {
// For UDP transport, hardcoding tempRefTimeout value as 1800s.
expInput.TempRefTimeout = 1800
expInput.IsEncrypted = false
}
expInput.CollectorAddress = collectorAddr
expInput.CollectorProtocol = collectorProto
expInput.PathMTU = 0
Expand Down Expand Up @@ -171,6 +147,8 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords
activeFlowTimeout: activeFlowTimeout,
idleFlowTimeout: idleFlowTimeout,
ipfixSet: ipfix.NewSet(false),
enableTLSToFlowAggregator: enableTLSToFlowAggregator,
k8sClient: k8sClient,
}, nil
}

Expand All @@ -191,7 +169,10 @@ func (exp *flowExporter) Export() {
// There could be other errors while initializing flow exporter other than connecting to IPFIX collector,
// therefore closing the connection and resetting the process.
if exp.process != nil {
exp.process.CloseConnToCollector()
if !(reflect.ValueOf(exp.process).Kind() == reflect.Ptr && reflect.ValueOf(exp.process).IsNil()) {
klog.Errorf("process is %v", exp.process)
}
// exp.process.CloseConnToCollector()
exp.process = nil
}
return
Expand All @@ -212,6 +193,31 @@ func (exp *flowExporter) Export() {

func (exp *flowExporter) initFlowExporter() error {
var err error
if exp.enableTLSToFlowAggregator {
// if CA certificate, client certificate and key do not exist during initialization,
// it will retry to obtain the credentials in next export cycle
exp.exporterInput.CACert, err = getCACert(exp.k8sClient)
if err != nil {
return fmt.Errorf("cannot retrieve CA cert: %v", err)
}
exp.exporterInput.ClientCert, exp.exporterInput.ClientKey, err = getClientCertKey(exp.k8sClient)
if err != nil {
return fmt.Errorf("cannot retrieve client cert and key: %v", err)
}
// TLS transport does not need any tempRefTimeout, so sending 0.
exp.exporterInput.TempRefTimeout = 0
exp.exporterInput.IsEncrypted = true
} else if exp.exporterInput.CollectorProtocol == "tcp" {
// TCP transport does not need any tempRefTimeout, so sending 0.
// tempRefTimeout is the template refresh timeout, which specifies how often
// the exporting process should send the template again.
exp.exporterInput.TempRefTimeout = 0
exp.exporterInput.IsEncrypted = false
} else {
// For UDP transport, hardcoding tempRefTimeout value as 1800s.
exp.exporterInput.TempRefTimeout = 1800
exp.exporterInput.IsEncrypted = false
}
exp.process, err = ipfix.NewIPFIXExportingProcess(exp.exporterInput)
if err != nil {
return fmt.Errorf("error when starting exporter: %v", err)
Expand Down
29 changes: 17 additions & 12 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,42 +143,47 @@ func setupTest(tb testing.TB) (*TestData, error) {
func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) {
// TODO: remove hardcoding to IPv4 after flow aggregator supports IPv6
isIPv6 := false
if _, err := setupTest(tb); err != nil {
data, err := setupTest(tb)
if err != nil {
return nil, err, isIPv6
}
// Create pod using ipfix collector image
if err := testData.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil {
if err = data.createPodOnNode("ipfix-collector", "", ipfixCollectorImage, nil, nil, nil, nil, true, nil); err != nil {
tb.Errorf("Error when creating the ipfix collector Pod: %v", err)
}
ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
ipfixCollectorIP, err := data.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace)
if err != nil || len(ipfixCollectorIP.ipStrings) == 0 {
tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err)
return nil, err, isIPv6
}
ipStr := ipfixCollectorIP.ipv4.String()
ipfixCollectorAddr := fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort)

faClusterIPAddr := ""
tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr)
faClusterIP, err := testData.deployFlowAggregator(ipfixCollectorAddr)
faClusterIP, err := data.deployFlowAggregator(ipfixCollectorAddr)
if err != nil {
return testData, err, isIPv6
}

faClusterIPAddr := ""
if testOptions.providerName == "kind" {
// In Kind cluster, there are issues with DNS name resolution on worker nodes.
// Please note that CoreDNS services are forced on to control-plane Node.
faClusterIPAddr = fmt.Sprintf("%s:%s:tcp", faClusterIP, ipfixCollectorPort)
tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr)
if err = data.deployAntreaFlowExporter(faClusterIPAddr); err != nil {
return data, err, isIPv6
}
}
tb.Logf("Deploying flow exporter with collector address: %s", faClusterIPAddr)
if err = testData.deployAntreaFlowExporter(faClusterIPAddr); err != nil {
return testData, err, isIPv6
if err = data.deployAntreaFlowExporter(faClusterIPAddr); err != nil {
return data, err, isIPv6
}

tb.Logf("Checking CoreDNS deployment")
if err = testData.checkCoreDNSPods(defaultTimeout); err != nil {
return testData, err, isIPv6
if err = data.checkCoreDNSPods(defaultTimeout); err != nil {
return data, err, isIPv6
}
return testData, nil, isIPv6
return data, nil, isIPv6
}

func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) {
Expand Down Expand Up @@ -304,7 +309,7 @@ func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs
}

func teardownFlowAggregator(tb testing.TB, data *TestData) {
if err := testData.gracefulExitFlowAggregator(testOptions.coverageDir); err != nil {
if err := data.gracefulExitFlowAggregator(testOptions.coverageDir); err != nil {
tb.Fatalf("Error when gracefully exiting Flow Aggregator: %v", err)
}
tb.Logf("Deleting '%s' K8s Namespace", flowAggregatorNamespace)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func TestFlowAggregator(t *testing.T) {
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownFlowAggregator(t, data)
defer teardownTest(t, data)
defer teardownFlowAggregator(t, data)

podAIP, podBIP, podCIP, svcB, svcC, err := createPerftestPods(data)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,8 @@ func (data *TestData) waitForAntreaDaemonSetPods(timeout time.Duration) error {
return true, nil
})
if err == wait.ErrWaitTimeout {
return fmt.Errorf("antrea-agent DaemonSet not ready within %v", defaultTimeout)
_, stdout, _, _ := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s describe pod", antreaNamespace))
return fmt.Errorf("antrea-agent DaemonSet not ready within %v; kubectl describe pod output: %v", defaultTimeout, stdout)
} else if err != nil {
return err
}
Expand Down

0 comments on commit bf78427

Please sign in to comment.