diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 5df827176e8..0d2968a76d1 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -126,7 +126,7 @@ func sendTemplateSet(t *testing.T, ctrl *gomock.Controller, mockIPFIXExpProc *ip } mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil) _, err := flowExp.sendTemplateSet(isIPv6) - assert.NoError(t, err, "Error in sending template set") + assert.NoError(t, err, "Error when sending template set") eL := flowExp.elementsListv4 if isIPv6 { @@ -254,7 +254,7 @@ func testSendDataSet(t *testing.T, v4Enabled bool, v6Enabled bool) { err := flowExp.addConnToSet(&conn) assert.NoError(t, err, "Error when adding record to data set") _, err = flowExp.sendDataSet() - assert.NoError(t, err, "Error in sending data set") + assert.NoError(t, err, "Error when sending data set") } if v4Enabled { diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index e7331b25434..35f46a9ac17 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -45,7 +45,6 @@ type IPFIXExporter struct { externalFlowCollectorProto string exportingProcess ipfix.IPFIXExportingProcess sendJSONRecord bool - includePodLabels bool observationDomainID uint32 templateIDv4 uint16 templateIDv6 uint16 @@ -95,7 +94,6 @@ func NewIPFIXExporter( externalFlowCollectorAddr: opt.ExternalFlowCollectorAddr, externalFlowCollectorProto: opt.ExternalFlowCollectorProto, sendJSONRecord: sendJSONRecord, - includePodLabels: opt.Config.RecordContents.PodLabels, observationDomainID: observationDomainID, registry: registry, set: ipfixentities.NewSet(false), @@ -127,7 +125,7 @@ func (e *IPFIXExporter) AddRecord(record ipfixentities.Record, isRecordIPv6 bool func (e *IPFIXExporter) UpdateOptions(opt *options.Options) { config := opt.Config.FlowCollector - if reflect.DeepEqual(config, e.config) && opt.Config.RecordContents.PodLabels == e.includePodLabels { + if reflect.DeepEqual(config, e.config) { return } @@ -144,12 +142,8 @@ func (e *IPFIXExporter) UpdateOptions(opt *options.Options) { } else { e.observationDomainID = genObservationDomainID(e.k8sClient) } - e.includePodLabels = opt.Config.RecordContents.PodLabels - klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID, "includePodLabels", e.includePodLabels) + klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID) - // In theory, a change to e.includePodLabels does not require opening a new connection, it - // just requires sending new templates. But it is easier to treat all configuration changes - // uniformly. if e.exportingProcess != nil { e.exportingProcess.CloseConnToCollector() e.exportingProcess = nil @@ -327,14 +321,12 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { } elements = append(elements, ie) } - if e.includePodLabels { - for _, ie := range infoelements.AntreaLabelsElementList { - ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID) - if err != nil { - return 0, err - } - elements = append(elements, ie) + for _, ie := range infoelements.AntreaLabelsElementList { + ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err } + elements = append(elements, ie) } e.set.ResetSet() if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil { diff --git a/pkg/flowaggregator/exporter/ipfix_test.go b/pkg/flowaggregator/exporter/ipfix_test.go index 6096c3043df..0009f2b2c3e 100644 --- a/pkg/flowaggregator/exporter/ipfix_test.go +++ b/pkg/flowaggregator/exporter/ipfix_test.go @@ -51,14 +51,14 @@ func createElement(name string, enterpriseID uint32) ipfixentities.InfoElementWi } func TestIPFIXExporter_sendTemplateSet(t *testing.T) { - ctrl := gomock.NewController(t) + runTest := func(t *testing.T, isIPv6 bool) { + ctrl := gomock.NewController(t) - mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) - mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) - mockTempSet := ipfixentitiestesting.NewMockSet(ctrl) + mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) + mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) + mockTempSet := ipfixentitiestesting.NewMockSet(ctrl) - newIPFIXExporter := func(includePodLabels bool) *IPFIXExporter { - return &IPFIXExporter{ + exporter := &IPFIXExporter{ externalFlowCollectorAddr: "", externalFlowCollectorProto: "", exportingProcess: mockIPFIXExpProc, @@ -66,34 +66,13 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) { templateIDv6: testTemplateIDv6, registry: mockIPFIXRegistry, set: mockTempSet, - includePodLabels: includePodLabels, observationDomainID: testObservationDomainID, } - } - - testcases := []struct { - isIPv6 bool - includePodLabels bool - }{ - {false, true}, - {true, true}, - {false, false}, - {true, false}, - } - - for _, tc := range testcases { - exporter := newIPFIXExporter(tc.includePodLabels) - elemList := createElementList(tc.isIPv6, mockIPFIXRegistry) + elemList := createElementList(isIPv6, mockIPFIXRegistry) testTemplateID := exporter.templateIDv4 - if tc.isIPv6 { + if isIPv6 { testTemplateID = exporter.templateIDv6 } - if tc.includePodLabels { - for _, ie := range infoelements.AntreaLabelsElementList { - elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID)) - mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) - } - } mockTempSet.EXPECT().ResetSet() mockTempSet.EXPECT().PrepareSet(ipfixentities.Template, testTemplateID).Return(nil) mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil) @@ -101,9 +80,12 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) { // above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements. mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil) - _, err := exporter.sendTemplateSet(tc.isIPv6) - assert.NoErrorf(t, err, "Error in sending template record: %v, isIPv6: %v", err, tc.isIPv6) + _, err := exporter.sendTemplateSet(isIPv6) + assert.NoErrorf(t, err, "Error when sending template record") } + + t.Run("IPv4", func(t *testing.T) { runTest(t, false) }) + t.Run("IPv6", func(t *testing.T) { runTest(t, true) }) } func TestIPFIXExporter_UpdateOptions(t *testing.T) { @@ -163,7 +145,7 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { const newAddr = "newAddr" const newProto = "newProto" config.FlowCollector.Address = fmt.Sprintf("%s:%s", newAddr, newProto) - config.RecordContents.PodLabels = true + config.FlowCollector.RecordFormat = "JSON" ipfixExporter.UpdateOptions(&options.Options{ Config: config, @@ -173,7 +155,7 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { assert.Equal(t, newAddr, ipfixExporter.externalFlowCollectorAddr) assert.Equal(t, newProto, ipfixExporter.externalFlowCollectorProto) - assert.True(t, ipfixExporter.includePodLabels) + assert.True(t, ipfixExporter.sendJSONRecord) require.NoError(t, ipfixExporter.AddRecord(mockRecord, false)) assert.Equal(t, 2, setCount, "Invalid number of flow sets sent by exporter") @@ -305,6 +287,10 @@ func createElementList(isIPv6 bool, mockIPFIXRegistry *ipfixtesting.MockIPFIXReg elemList = append(elemList, createElement(infoelements.AntreaDestinationThroughputElementList[i], ipfixregistry.AntreaEnterpriseID)) mockIPFIXRegistry.EXPECT().GetInfoElement(infoelements.AntreaDestinationThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) } + for _, ie := range infoelements.AntreaLabelsElementList { + elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID)) + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) + } return elemList } diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 047d3ff8da9..232cb5d84c1 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -417,7 +417,8 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor fa.fillK8sMetadata(key, record.Record, *startTime) fa.aggregationProcess.SetCorrelatedFieldsFilled(record, true) } - if fa.includePodLabels && !fa.aggregationProcess.AreExternalFieldsFilled(*record) { + // Even if fa.includePodLabels is false, we still need to add an empty IE to match the template. + if !fa.aggregationProcess.AreExternalFieldsFilled(*record) { fa.fillPodLabels(key, record.Record, *startTime) fa.aggregationProcess.SetExternalFieldsFilled(record, true) } @@ -502,7 +503,11 @@ func (fa *flowAggregator) fetchPodLabels(ip string, startTime time.Time) string klog.ErrorS(nil, "Error when getting Pod information from podInformer", "ip", ip, "startTime", startTime) return "" } - labelsJSON, err := json.Marshal(pod.GetLabels()) + labels := pod.GetLabels() + if labels == nil { + labels = map[string]string{} + } + labelsJSON, err := json.Marshal(labels) if err != nil { klog.ErrorS(err, "Error when JSON encoding of Pod labels") return "" @@ -512,22 +517,25 @@ func (fa *flowAggregator) fetchPodLabels(ip string, startTime time.Time) string func (fa *flowAggregator) fillPodLabelsForSide(ip string, record ipfixentities.Record, startTime time.Time, podNamespaceIEName, podNameIEName, podLabelsIEName string) error { podLabelsString := "" - if podName, _, ok := record.GetInfoElementWithValue(podNameIEName); ok { - podNameString := podName.GetStringValue() - if podNamespace, _, ok := record.GetInfoElementWithValue(podNamespaceIEName); ok { - podNamespaceString := podNamespace.GetStringValue() - if podNameString != "" && podNamespaceString != "" { - podLabelsString = fa.fetchPodLabels(ip, startTime) + // If fa.includePodLabels is false, we always use an empty string. + // If fa.includePodLabels is true, we use an empty string in case of error or if the + // endpoint is not a Pod, and a valid JSON dictionary otherwise (which will be empty if the + // Pod has no labels). + if fa.includePodLabels { + if podName, _, ok := record.GetInfoElementWithValue(podNameIEName); ok { + podNameString := podName.GetStringValue() + if podNamespace, _, ok := record.GetInfoElementWithValue(podNamespaceIEName); ok { + podNamespaceString := podNamespace.GetStringValue() + if podNameString != "" && podNamespaceString != "" { + podLabelsString = fa.fetchPodLabels(ip, startTime) + } } } } podLabelsElement, err := fa.registry.GetInfoElement(podLabelsIEName, ipfixregistry.AntreaEnterpriseID) if err == nil { - podLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(podLabelsElement, bytes.NewBufferString(podLabelsString).Bytes()) - if err != nil { - return fmt.Errorf("error when creating podLabels InfoElementWithValue: %v", err) - } + podLabelsIE := ipfixentities.NewStringInfoElement(podLabelsElement, podLabelsString) if err := record.AddInfoElement(podLabelsIE); err != nil { return fmt.Errorf("error when adding podLabels InfoElementWithValue: %v", err) } @@ -540,10 +548,10 @@ func (fa *flowAggregator) fillPodLabelsForSide(ip string, record ipfixentities.R func (fa *flowAggregator) fillPodLabels(key ipfixintermediate.FlowKey, record ipfixentities.Record, startTime time.Time) { if err := fa.fillPodLabelsForSide(key.SourceAddress, record, startTime, "sourcePodNamespace", "sourcePodName", "sourcePodLabels"); err != nil { - klog.ErrorS(err, "Error when filling pod labels", "side", "source") + klog.ErrorS(err, "Error when filling Pod labels", "side", "source") } if err := fa.fillPodLabelsForSide(key.DestinationAddress, record, startTime, "destinationPodNamespace", "destinationPodName", "destinationPodLabels"); err != nil { - klog.ErrorS(err, "Error when filling pod labels", "side", "destination") + klog.ErrorS(err, "Error when filling Pod labels", "side", "destination") } } diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 53160e87bb2..956b7bf83cd 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -18,7 +18,6 @@ import ( "bytes" "os" "path/filepath" - "strconv" "sync" "testing" "time" @@ -59,31 +58,6 @@ func init() { } func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { - ctrl := gomock.NewController(t) - mockPodStore := podstoretest.NewMockInterface(ctrl) - mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) - mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) - mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) - mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) - mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl) - - newFlowAggregator := func(includePodLabels bool) *flowAggregator { - return &flowAggregator{ - aggregatorTransportProtocol: "tcp", - aggregationProcess: mockAggregationProcess, - activeFlowRecordTimeout: testActiveTimeout, - inactiveFlowRecordTimeout: testInactiveTimeout, - ipfixExporter: mockIPFIXExporter, - clickHouseExporter: mockClickHouseExporter, - registry: mockIPFIXRegistry, - flowAggregatorAddress: "", - includePodLabels: includePodLabels, - podStore: mockPodStore, - } - } - - mockExporters := []*exportertesting.MockInterface{mockIPFIXExporter, mockClickHouseExporter} - ipv4Key := ipfixintermediate.FlowKey{ SourceAddress: "10.0.0.1", DestinationAddress: "10.0.0.2", @@ -91,7 +65,6 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { SourcePort: 1234, DestinationPort: 5678, } - ipv6Key := ipfixintermediate.FlowKey{ SourceAddress: "2001:0:3238:dfe1:63::fefb", DestinationAddress: "2001:0:3238:dfe1:63::fefc", @@ -100,82 +73,124 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { DestinationPort: 5678, } - readyRecord := &ipfixintermediate.AggregationFlowRecord{ - Record: mockRecord, - ReadyToSend: true, + podA := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podA", + }, + } + podB := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podB", + }, } testcases := []struct { name string isIPv6 bool flowKey ipfixintermediate.FlowKey - flowRecord *ipfixintermediate.AggregationFlowRecord includePodLabels bool }{ { "IPv4_ready_to_send_with_pod_labels", false, ipv4Key, - readyRecord, true, }, { "IPv6_ready_to_send_with_pod_labels", true, ipv6Key, - readyRecord, true, }, { "IPv4_ready_to_send_without_pod_labels", false, ipv4Key, - readyRecord, false, }, { "IPv6_ready_to_send_without_pod_labels", true, ipv6Key, - readyRecord, false, }, } for _, tc := range testcases { - fa := newFlowAggregator(tc.includePodLabels) - for _, exporter := range mockExporters { - exporter.EXPECT().AddRecord(mockRecord, tc.isIPv6) - } - emptyStr := make([]byte, 0) - - mockAggregationProcess.EXPECT().ResetStatAndThroughputElementsInRecord(mockRecord).Return(nil) - flowStartSecondsElement, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("flowStartSeconds", 150, 14, ipfixregistry.IANAEnterpriseID, 4), []byte(strconv.Itoa(int(time.Now().Unix())))) - mockRecord.EXPECT().GetInfoElementWithValue("flowStartSeconds").Return(flowStartSecondsElement, 0, true) - mockAggregationProcess.EXPECT().AreCorrelatedFieldsFilled(*tc.flowRecord).Return(false) - sourcePodNameElem, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("sourcePodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), emptyStr) - mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameElem, 0, false) - destPodNameElem, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("destinationPodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), emptyStr) - mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destPodNameElem, 0, false) - mockAggregationProcess.EXPECT().SetCorrelatedFieldsFilled(tc.flowRecord, true) - if tc.includePodLabels { - mockAggregationProcess.EXPECT().AreExternalFieldsFilled(*tc.flowRecord).Return(false) - mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameElem, 0, false) + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPodStore := podstoretest.NewMockInterface(ctrl) + mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) + mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) + mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) + mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) + mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl) + + newFlowAggregator := func(includePodLabels bool) *flowAggregator { + return &flowAggregator{ + aggregatorTransportProtocol: "tcp", + aggregationProcess: mockAggregationProcess, + activeFlowRecordTimeout: testActiveTimeout, + inactiveFlowRecordTimeout: testInactiveTimeout, + ipfixExporter: mockIPFIXExporter, + clickHouseExporter: mockClickHouseExporter, + registry: mockIPFIXRegistry, + flowAggregatorAddress: "", + includePodLabels: includePodLabels, + podStore: mockPodStore, + } + } + + mockExporters := []*exportertesting.MockInterface{mockIPFIXExporter, mockClickHouseExporter} + + flowRecord := &ipfixintermediate.AggregationFlowRecord{ + Record: mockRecord, + ReadyToSend: true, + } + + fa := newFlowAggregator(tc.includePodLabels) + for _, exporter := range mockExporters { + exporter.EXPECT().AddRecord(mockRecord, tc.isIPv6) + } + + startTime := time.Now().Truncate(time.Second) + + mockAggregationProcess.EXPECT().ResetStatAndThroughputElementsInRecord(mockRecord).Return(nil) + flowStartSecondsIE := ipfixentities.NewDateTimeSecondsInfoElement(ipfixentities.NewInfoElement("flowStartSeconds", 150, 14, ipfixregistry.IANAEnterpriseID, 4), uint32(startTime.Unix())) + mockRecord.EXPECT().GetInfoElementWithValue("flowStartSeconds").Return(flowStartSecondsIE, 0, true) + mockAggregationProcess.EXPECT().AreCorrelatedFieldsFilled(*flowRecord).Return(false) + sourcePodNameIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourcePodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "podA") + mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameIE, 0, true).MinTimes(1) + destinationPodNameIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationPodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "podB") + mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destinationPodNameIE, 0, true).MinTimes(1) + mockAggregationProcess.EXPECT().SetCorrelatedFieldsFilled(flowRecord, true) + mockAggregationProcess.EXPECT().AreExternalFieldsFilled(*flowRecord).Return(false) + podLabels := "" + if tc.includePodLabels { + podLabels = "{}" + sourcePodNamespaceIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourcePodNamespace", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "default") + mockRecord.EXPECT().GetInfoElementWithValue("sourcePodNamespace").Return(sourcePodNamespaceIE, 0, true) + destinationPodNamespaceIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationPodNamespace", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "default") + mockRecord.EXPECT().GetInfoElementWithValue("destinationPodNamespace").Return(destinationPodNamespaceIE, 0, true) + mockPodStore.EXPECT().GetPodByIPAndTime(tc.flowKey.SourceAddress, startTime).Return(podA, true) + mockPodStore.EXPECT().GetPodByIPAndTime(tc.flowKey.DestinationAddress, startTime).Return(podB, true) + } sourcePodLabelsElement := ipfixentities.NewInfoElement("sourcePodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) mockIPFIXRegistry.EXPECT().GetInfoElement("sourcePodLabels", ipfixregistry.AntreaEnterpriseID).Return(sourcePodLabelsElement, nil) - sourcePodLabelsIE, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(sourcePodLabelsElement, bytes.NewBufferString("").Bytes()) + sourcePodLabelsIE := ipfixentities.NewStringInfoElement(sourcePodLabelsElement, podLabels) mockRecord.EXPECT().AddInfoElement(sourcePodLabelsIE).Return(nil) - mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destPodNameElem, 0, false) destinationPodLabelsElement := ipfixentities.NewInfoElement("destinationPodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(destinationPodLabelsElement, nil) - destinationPodLabelsIE, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(destinationPodLabelsElement, bytes.NewBufferString("").Bytes()) + destinationPodLabelsIE := ipfixentities.NewStringInfoElement(destinationPodLabelsElement, podLabels) mockRecord.EXPECT().AddInfoElement(destinationPodLabelsIE).Return(nil) - mockAggregationProcess.EXPECT().SetExternalFieldsFilled(tc.flowRecord, true) - } - mockAggregationProcess.EXPECT().IsAggregatedRecordIPv4(*tc.flowRecord).Return(!tc.isIPv6) + mockAggregationProcess.EXPECT().SetExternalFieldsFilled(flowRecord, true) + mockAggregationProcess.EXPECT().IsAggregatedRecordIPv4(*flowRecord).Return(!tc.isIPv6) - err := fa.sendFlowKeyRecord(tc.flowKey, tc.flowRecord) - assert.NoError(t, err, "Error in sending flow key record: %v, key: %v, record: %v", err, tc.flowKey, tc.flowRecord) + err := fa.sendFlowKeyRecord(tc.flowKey, flowRecord) + assert.NoError(t, err, "Error when sending flow key record, key: %v, record: %v", tc.flowKey, flowRecord) + }) } } @@ -688,50 +703,64 @@ func TestFlowAggregator_closeUpdateChBeforeFlowExportLoopReturns(t *testing.T) { } func TestFlowAggregator_fetchPodLabels(t *testing.T) { - ctrl := gomock.NewController(t) - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "testPod", - Labels: map[string]string{ - "test": "ut", - }, - }, - Status: v1.PodStatus{ - Phase: v1.PodPending, - PodIPs: []v1.PodIP{ - { - IP: "192.168.1.2", - }, - }, - }, - } - - client := fake.NewSimpleClientset() - // Mock pod store - mockPodStore := podstoretest.NewMockInterface(ctrl) - mockPodStore.EXPECT().GetPodByIPAndTime("", gomock.Any()).Return(nil, false) - mockPodStore.EXPECT().GetPodByIPAndTime("192.168.1.2", gomock.Any()).Return(pod, true) - tests := []struct { name string ip string + pod *v1.Pod want string }{ { name: "no pod object", - ip: "", + ip: "192.168.1.2", + pod: nil, want: "", }, { name: "pod with label", ip: "192.168.1.2", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "testPod", + Labels: map[string]string{ + "test": "ut", + }, + }, + }, want: "{\"test\":\"ut\"}", }, + { + name: "pod with empty labels", + ip: "192.168.1.2", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "testPod", + Labels: map[string]string{}, + }, + }, + want: "{}", + }, + { + name: "pod with null labels", + ip: "192.168.1.2", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "testPod", + Labels: nil, + }, + }, + want: "{}", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + client := fake.NewSimpleClientset() + mockPodStore := podstoretest.NewMockInterface(ctrl) + mockPodStore.EXPECT().GetPodByIPAndTime(tt.ip, gomock.Any()).Return(tt.pod, tt.pod != nil) fa := &flowAggregator{ k8sClient: client, includePodLabels: true, @@ -865,19 +894,12 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { }, }, } - emptyStr := make([]byte, 0) - sourcePodNameElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("sourcePodName", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) - sourcePodNamespaceElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("sourcePodNamespace", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) - sourceNodeNameElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("sourceNodeName", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) - destinationPodNameElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("destinationPodName", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) - destinationPodNamespaceElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("destinationPodNamespace", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) - destinationNodeNameElem, err := ipfixentities.DecodeAndCreateInfoElementWithValue(ipfixentities.NewInfoElement("destinationNodeName", uint16(0), ipfixentities.String, ipfixregistry.AntreaEnterpriseID, uint16(0)), emptyStr) - require.NoError(t, err) + sourcePodNameElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourcePodName", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") + sourcePodNamespaceElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourcePodNamespace", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") + sourceNodeNameElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourceNodeName", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") + destinationPodNameElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationPodName", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") + destinationPodNamespaceElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationPodNamespace", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") + destinationNodeNameElem := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationNodeName", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0), "") ctrl := gomock.NewController(t) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl)