Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in FlowAggregator when sending template set #2039

Merged
merged 1 commit into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,7 @@ func (exp *flowExporter) initFlowExporter() error {
if exp.v4Enabled {
templateID := exp.process.NewTemplateID()
exp.templateIDv4 = templateID
if err = exp.ipfixSet.PrepareSet(ipfixentities.Template, exp.templateIDv4); err != nil {
return err
}
sentBytes, err := exp.sendTemplateSet(exp.ipfixSet, false)
exp.ipfixSet.ResetSet()
sentBytes, err := exp.sendTemplateSet(false)
if err != nil {
return err
}
Expand All @@ -246,11 +242,7 @@ func (exp *flowExporter) initFlowExporter() error {
if exp.v6Enabled {
templateID := exp.process.NewTemplateID()
exp.templateIDv6 = templateID
if err = exp.ipfixSet.PrepareSet(ipfixentities.Template, exp.templateIDv6); err != nil {
return err
}
sentBytes, err := exp.sendTemplateSet(exp.ipfixSet, true)
exp.ipfixSet.ResetSet()
sentBytes, err := exp.sendTemplateSet(true)
if err != nil {
return err
}
Expand Down Expand Up @@ -359,7 +351,7 @@ func (exp *flowExporter) sendFlowRecords() error {
return nil
}

func (exp *flowExporter) sendTemplateSet(templateSet ipfixentities.Set, isIPv6 bool) (int, error) {
func (exp *flowExporter) sendTemplateSet(isIPv6 bool) (int, error) {
elements := make([]*ipfixentities.InfoElementWithValue, 0)

IANAInfoElements := IANAInfoElementsIPv4
Expand Down Expand Up @@ -394,13 +386,15 @@ func (exp *flowExporter) sendTemplateSet(templateSet ipfixentities.Set, isIPv6 b
ieWithValue := ipfixentities.NewInfoElementWithValue(element, nil)
elements = append(elements, ieWithValue)
}

err := templateSet.AddRecord(elements, templateID)
exp.ipfixSet.ResetSet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only difference is that we call ResetSet first? Not really clear from the PR description given that we were already calling PrepareSet:

We need to prepare the set as a template set before sending the template record

I thought calling ResetSet was the key to the bug fix, but there is no mention of it in the PR description.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised PR description.

if err := exp.ipfixSet.PrepareSet(ipfixentities.Template, templateID); err != nil {
return 0, err
}
err := exp.ipfixSet.AddRecord(elements, templateID)
if err != nil {
return 0, fmt.Errorf("error in adding record to template set: %v", err)
}

sentBytes, err := exp.process.SendSet(templateSet)
sentBytes, err := exp.process.SendSet(exp.ipfixSet)
if err != nil {
return 0, fmt.Errorf("error in IPFIX exporting process when sending template record: %v", err)
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func testSendTemplateSet(t *testing.T, v4Enabled bool, v6Enabled bool) {
func sendTemplateSet(t *testing.T, ctrl *gomock.Controller, mockIPFIXExpProc *ipfixtest.MockIPFIXExportingProcess, mockIPFIXRegistry *ipfixtest.MockIPFIXRegistry, flowExp *flowExporter, isIPv6 bool) {
var mockTempSet *ipfixentitiestesting.MockSet
mockTempSet = ipfixentitiestesting.NewMockSet(ctrl)

flowExp.ipfixSet = mockTempSet
// Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
var elemList = make([]*ipfixentities.InfoElementWithValue, 0)
Expand Down Expand Up @@ -125,8 +125,14 @@ func sendTemplateSet(t *testing.T, ctrl *gomock.Controller, mockIPFIXExpProc *ip
}
// Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes
// above elements: IANAInfoElements, IANAReverseInfoElements and AntreaInfoElements.
mockTempSet.EXPECT().ResetSet()
if !isIPv6 {
mockTempSet.EXPECT().PrepareSet(ipfixentities.Template, testTemplateIDv4).Return(nil)
} else {
mockTempSet.EXPECT().PrepareSet(ipfixentities.Template, testTemplateIDv6).Return(nil)
}
mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil)
_, err := flowExp.sendTemplateSet(mockTempSet, isIPv6)
_, err := flowExp.sendTemplateSet(isIPv6)
assert.NoError(t, err, "Error in sending template set")

eL := flowExp.elementsListv4
Expand Down
21 changes: 9 additions & 12 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,7 @@ func (fa *flowAggregator) initExportingProcess() error {
fa.exportingProcess = ep
// Currently, we send two templates for IPv4 and IPv6 regardless of the IP families supported by cluster
fa.templateIDv4 = fa.exportingProcess.NewTemplateID()
if err := fa.set.PrepareSet(ipfixentities.Template, fa.templateIDv4); err != nil {
return fmt.Errorf("error when preparing IPv4 template set: %v", err)
}
bytesSent, err := fa.sendTemplateSet(fa.set, false)
bytesSent, err := fa.sendTemplateSet(false)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
Expand All @@ -330,12 +327,8 @@ func (fa *flowAggregator) initExportingProcess() error {
}
klog.V(2).Infof("Initialized exporting process and sent %d bytes size of IPv4 template set", bytesSent)

fa.set.ResetSet()
fa.templateIDv6 = fa.exportingProcess.NewTemplateID()
if err := fa.set.PrepareSet(ipfixentities.Template, fa.templateIDv6); err != nil {
return fmt.Errorf("error when preparing IPv6 template set: %v", err)
}
bytesSent, err = fa.sendTemplateSet(fa.set, true)
bytesSent, err = fa.sendTemplateSet(true)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
Expand Down Expand Up @@ -426,7 +419,7 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
return nil
}

func (fa *flowAggregator) sendTemplateSet(templateSet ipfixentities.Set, isIPv6 bool) (int, error) {
func (fa *flowAggregator) sendTemplateSet(isIPv6 bool) (int, error) {
elements := make([]*ipfixentities.InfoElementWithValue, 0)
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
Expand Down Expand Up @@ -486,11 +479,15 @@ func (fa *flowAggregator) sendTemplateSet(templateSet ipfixentities.Set, isIPv6
ie := ipfixentities.NewInfoElementWithValue(element, nil)
elements = append(elements, ie)
}
err := templateSet.AddRecord(elements, templateID)
fa.set.ResetSet()
if err := fa.set.PrepareSet(ipfixentities.Template, templateID); err != nil {
return 0, err
}
Comment on lines +482 to +485
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do reset and prepare here? I saw we had done it here and here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I encountered the issue when the exporter is restarted in the Flow Aggregator and I see the set still has a record of type DataType. Looking more closely at the existing code, we are missing the set reset before L296 here.

I think it is better to have it in one place. Removed those statements in the caller of sendTemplateSet. I am doing the same fix in Flow Exporter too.

err := fa.set.AddRecord(elements, templateID)
if err != nil {
return 0, fmt.Errorf("error when adding record to set, error: %v", err)
}
bytesSent, err := fa.exportingProcess.SendSet(templateSet)
bytesSent, err := fa.exportingProcess.SendSet(fa.set)
return bytesSent, err
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,14 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)].Element, nil)
}
mockTempSet.EXPECT().ResetSet()
mockTempSet.EXPECT().PrepareSet(ipfixentities.Template, testTemplateID).Return(nil)
mockTempSet.EXPECT().AddRecord(elemList, testTemplateID).Return(nil)
// Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes
// above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements.
mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil)

_, err := fa.sendTemplateSet(mockTempSet, isIPv6)
_, err := fa.sendTemplateSet(isIPv6)
assert.NoErrorf(t, err, "Error in sending template record: %v, isIPv6: %v", err, isIPv6)
}
}
5 changes: 3 additions & 2 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,9 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri

// Polling to make sure all the data records corresponding to the iperf flow
// are received.
err = wait.Poll(250*time.Millisecond, collectorCheckTimeout, func() (bool, error) {
rc, collectorOutput, _, err := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --since=%v ipfix-collector -n antrea-test", time.Since(timeStart).String()))
err = wait.PollImmediate(500*time.Millisecond, aggregatorInactiveFlowRecordTimeout, func() (bool, error) {
// `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed.
rc, collectorOutput, _, err := provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --since=%v --pod-running-timeout=%v ipfix-collector -n antrea-test", time.Since(timeStart).String(), aggregatorInactiveFlowRecordTimeout.String()))
if err != nil || rc != 0 {
return false, err
}
Expand Down