From 1f4f8d1d65d443cb45ff100f5f7765683ae6cf39 Mon Sep 17 00:00:00 2001 From: Tushar Tathgur Date: Thu, 18 Jan 2024 02:38:21 +0530 Subject: [PATCH] Addressed new comments Signed-off-by: Tushar Tathgur --- cmd/antrea-agent/agent.go | 3 +- docs/network-flow-visibility.md | 2 +- .../connections/conntrack_connections.go | 3 +- .../flowexporter/connections/l7_listener.go | 2 + pkg/agent/flowexporter/exporter/exporter.go | 7 +- test/e2e/flowaggregator_test.go | 65 ++++++++++--------- 6 files changed, 48 insertions(+), 34 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index fb851a43a78..deda964a861 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -672,7 +672,8 @@ func run(o *Options) error { networkPolicyController, flowExporterOptions, egressController, - l7FlowExporterController) + l7FlowExporterController, + l7FlowExporterEnabled) if err != nil { return fmt.Errorf("error when creating IPFIX flow exporter: %v", err) } diff --git a/docs/network-flow-visibility.md b/docs/network-flow-visibility.md index 21926016d20..b1ff2825df8 100644 --- a/docs/network-flow-visibility.md +++ b/docs/network-flow-visibility.md @@ -668,4 +668,4 @@ HTTP fields in the `httpVals` are: As of now, the only supported layer 7 protocol is `HTTP1.1`. Support for more protocols may be added in the future. Antrea supports L7FlowExporter feature only -on Linux Nodes. Windows Nodes are not supported yet. +on Linux Nodes. diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index 56a22fe99cb..70105d7096d 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -104,7 +104,8 @@ func (cs *ConntrackConnectionStore) Run(stopCh <-chan struct{}) { // TODO: As optimization, only poll invalid/closed connections during every poll, and poll the established connections right before the export. func (cs *ConntrackConnectionStore) Poll() ([]int, error) { klog.V(2).Infof("Polling conntrack") - // DeepCopy the L7EventMap before polling the conntrack table to match corresponding L4 connection with L7 events and avoid missing the L7 events for corresponding L4 connection + // DeepCopy the L7EventMap before polling the conntrack table to match corresponding L4 connection with L7 events + // and avoid missing the L7 events for corresponding L4 connection l7EventMap := cs.l7EventMapGetter.ConsumeL7EventMap() var zones []uint16 diff --git a/pkg/agent/flowexporter/connections/l7_listener.go b/pkg/agent/flowexporter/connections/l7_listener.go index 4902abc4da4..c3167aec6d3 100644 --- a/pkg/agent/flowexporter/connections/l7_listener.go +++ b/pkg/agent/flowexporter/connections/l7_listener.go @@ -101,9 +101,11 @@ func (l *L7Listener) listenAndAcceptConn() { // Remove stale connections if err := os.Remove(l.suricataEventSocketPath); err != nil && !os.IsNotExist(err) { klog.V(2).ErrorS(err, "failed to remove stale socket") + return } if err := os.MkdirAll(filepath.Dir(l.suricataEventSocketPath), 0750); err != nil { klog.ErrorS(err, "Failed to create directory %s", filepath.Dir(l.suricataEventSocketPath)) + return } listener, err := net.Listen("unix", l.suricataEventSocketPath) if err != nil { diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 4d4436e47f5..0d473cba693 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -161,7 +161,7 @@ func prepareExporterInputArgs(collectorProto, nodeName string) exporter.Exporter func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller, trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet, ovsDatapathType ovsconfig.OVSDatapathType, proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions, - egressQuerier querier.EgressQuerier, podL7FlowExporterAttrGetter connections.PodL7FlowExporterAttrGetter) (*FlowExporter, error) { + egressQuerier querier.EgressQuerier, podL7FlowExporterAttrGetter connections.PodL7FlowExporterAttrGetter, l7FlowExporterEnabled bool) (*FlowExporter, error) { // Initialize IPFIX registry registry := ipfix.NewIPFIXRegistry() registry.LoadRegistry() @@ -172,7 +172,10 @@ func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClie return nil, err } expInput := prepareExporterInputArgs(o.FlowCollectorProto, nodeName) - l7Listener := connections.NewL7Listener(podL7FlowExporterAttrGetter, podStore) + var l7Listener *connections.L7Listener + if l7FlowExporterEnabled { + l7Listener = connections.NewL7Listener(podL7FlowExporterAttrGetter, podStore) + } connTrackDumper := connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, proxyEnabled) denyConnStore := connections.NewDenyConnectionStore(podStore, proxier, o) diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 877b71a0131..00c6711d06f 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -271,16 +271,18 @@ func TestFlowAggregator(t *testing.T) { if v4Enabled { t.Run("IPv4", func(t *testing.T) { testHelper(t, data, false) }) + t.Run("L7FlowExporterController", func(t *testing.T) { + testL7FlowExporterController(t, data, false) + }) } if v6Enabled { t.Run("IPv6", func(t *testing.T) { testHelper(t, data, true) }) + t.Run("L7FlowExporterController", func(t *testing.T) { + testL7FlowExporterController(t, data, true) + }) } - t.Run("L7FlowExporterController", func(t *testing.T) { - testL7FlowExporterControllerRun(t, data) - }) - } func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, isIPv6 bool, labelFilter string) { @@ -1877,37 +1879,42 @@ func getAndCheckFlowAggregatorMetrics(t *testing.T, data *TestData) error { return nil } -func testL7FlowExporterControllerRun(t *testing.T, data *TestData) { +func testL7FlowExporterController(t *testing.T, data *TestData, isIPv6 bool) { skipIfFeatureDisabled(t, features.L7FlowExporter, true, false) - clientPodName := "test-l7-flow-exporter" - clientPodLabels := map[string]string{"test-l7-flow-exporter-e2e": "true"} - clientPodAnnotations := map[string]string{antreaagenttypes.L7FlowExporterAnnotationKey: "both"} - cmd := []string{"sleep", "3600"} - - // Create a client Pod which will be selected by test L7 NetworkPolices. - require.NoError(t, NewPodBuilder(clientPodName, data.testNamespace, toolboxImage).OnNode(nodeName(0)).WithCommand(cmd).WithLabels(clientPodLabels).WithAnnotations(clientPodAnnotations).Create(data)) - clientPodIP, err := data.podWaitForIPs(defaultTimeout, clientPodName, data.testNamespace) - require.NoErrorf(t, err, "Error when waiting for IP for Pod '%s': %v", clientPodName, err) - require.NoError(t, data.podWaitForRunning(defaultTimeout, clientPodName, data.testNamespace)) - - serverIPs := createToExternalTestServer(t, data) - srcIP := clientPodIP.IPv4.String() - dstIP := serverIPs.IPv4.String() + err := data.UpdatePod(data.testNamespace, "perftest-a", func(pod *corev1.Pod) { + pod.Annotations = map[string]string{antreaagenttypes.L7FlowExporterAnnotationKey: "both"} + }) + require.NoErrorf(t, err, "error when updated pod annotations") - // checkRecordsForToExternalFlows(t, data, nodeName(0), clientPodName, clientPodIP.ipv4.String(), serverIPs.ipv4.String(), serverPodPort, isIPv6, "", "") - cmd = []string{ - "curl", - fmt.Sprintf("http://%s:%d", serverIPs.IPv4.String(), serverPodPort), + testFlow1 := testFlow{ + srcPodName: "perftest-a", + dstPodName: "perftest-b", + } + var cmd []string + if !isIPv6 { + testFlow1.srcIP = podAIPs.IPv4.String() + testFlow1.dstIP = podBIPs.IPv4.String() + cmd = []string{ + "curl", + fmt.Sprintf("http://%s:%d", podBIPs.IPv4.String(), serverPodPort), + } + } else { + testFlow1.srcIP = podAIPs.IPv6.String() + testFlow1.dstIP = podBIPs.IPv6.String() + cmd = []string{ + "curl", + fmt.Sprintf("http://%s:%d", podBIPs.IPv6.String(), serverPodPort), + } } - stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, clientPodName, toolboxContainerName, cmd) + stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, "perftest-a", toolboxContainerName, cmd) require.NoErrorf(t, err, "Error when running curl command, stdout: %s, stderr: %s", stdout, stderr) - _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, false, data, "") + _, recordSlices := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, "") for _, record := range recordSlices { - if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { + if strings.Contains(record, testFlow1.srcIP) && strings.Contains(record, testFlow1.dstIP) { // checkPodAndNodeData(t, record, clientPodName, nodeName(0), "", "", data.testNamespace) assert := assert.New(t) - assert.Contains(record, clientPodName, "Record with srcIP does not have Pod name: %s", clientPodName) + assert.Contains(record, "perftest-a", "Record with srcIP does not have Pod name: perftest-a") assert.Contains(record, fmt.Sprintf("sourcePodNamespace: %s", data.testNamespace), "Record does not have correct sourcePodNamespace: %s", data.testNamespace) assert.Contains(record, fmt.Sprintf("sourceNodeName: %s", nodeName(0)), "Record does not have correct sourceNodeName: %s", nodeName(0)) assert.Contains(record, fmt.Sprintf("\"test-l7-flow-exporter-e2e\":\"true\""), "Record does not have correct label for source Pod") @@ -1917,10 +1924,10 @@ func testL7FlowExporterControllerRun(t *testing.T, data *TestData) { } } - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, "") + clickHouseRecords := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false, "") for _, record := range clickHouseRecords { assert := assert.New(t) - assert.Equal(record.SourcePodName, clientPodName, "Record with srcIP does not have Pod name: %s", clientPodName) + assert.Equal(record.SourcePodName, "perftest-a", "Record with srcIP does not have Pod name: perftest-a") assert.Equal(record.SourcePodNamespace, data.testNamespace, "Record does not have correct sourcePodNamespace: %s", data.testNamespace) assert.Equal(record.SourceNodeName, nodeName(0), "Record does not have correct sourceNodeName: %s", nodeName(0)) assert.Contains(record.SourcePodLabels, fmt.Sprintf("\"test-l7-flow-exporter-e2e\":\"true\""), "Record does not have correct label for source Pod")