Skip to content

Commit

Permalink
Addreseed the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
srikartati committed Aug 11, 2020
1 parent b8d58b5 commit c9d365d
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func run(o *Options) error {
flowExporter := exporter.NewFlowExporter(
flowrecords.NewFlowRecords(connStore),
o.config.FlowExportFrequency)
go wait.Until(func() { flowExporter.CheckAndDoExport(o.flowCollector, pollDone) }, o.pollInterval, stopCh)
go wait.Until(func() { flowExporter.Export(o.flowCollector, stopCh, pollDone) }, 0, stopCh)
}

<-stopCh
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/flowexporter/connections/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl"
)

// InitializeConnTrackDumper initialize the ConnTrackDumper interface for different OS and datapath types.
// InitializeConnTrackDumper initializes the ConnTrackDumper interface for different OS and datapath types.
func InitializeConnTrackDumper(nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet, ovsctlClient ovsctl.OVSCtlClient, ovsDatapathType string) ConnTrackDumper {
var connTrackDumper ConnTrackDumper
if ovsDatapathType == ovsconfig.OVSDatapathSystem {
Expand Down Expand Up @@ -60,6 +60,7 @@ func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.Node
// Conntrack flows will be different for Pod-to-Service flows w/ Antrea-proxy. This implementation will be simpler, when the
// Antrea proxy is supported.
if serviceCIDR.Contains(srcIP) || serviceCIDR.Contains(dstIP) {
klog.V(4).Infof("Detected a flow with Cluster IP :%v", conn)
continue
}
filteredConns = append(filteredConns, conn)
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/flowexporter/connections/conntrack_ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (ct *connTrackOvsCtl) DumpFlows(zoneFilter uint16) ([]*flowexporter.Connect
}

filteredConns := filterAntreaConns(conns, ct.nodeConfig, ct.serviceCIDR, zoneFilter)
klog.V(2).Infof("Flow exporter considered flows: %d", len(filteredConns))
klog.V(2).Infof("FlowExporter considered flows: %d", len(filteredConns))

return filteredConns, nil
}
Expand All @@ -83,14 +83,14 @@ func (ct *connTrackOvsCtl) ovsAppctlDumpConnections(zoneFilter uint16) ([]*flowe
for _, flow := range outputFlow {
conn, err := flowStringToAntreaConnection(flow, zoneFilter)
if err != nil {
klog.Warningf("Ignoring the flow from conntrack dump due to the error: %v", err)
klog.V(4).Infof("Ignoring the flow from conntrack dump due to parsing error: %v", err)
continue
}
if conn != nil {
antreaConns = append(antreaConns, conn)
}
}
klog.V(2).Infof("Finished dumping -- total no. of flows in conntrack: %d", len(antreaConns))
klog.V(2).Infof("FlowExporter considered flows in conntrack: %d", len(antreaConns))
return antreaConns, nil
}

Expand Down
63 changes: 39 additions & 24 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,34 +91,49 @@ func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *fl
}
}

// CheckAndDoExport enables us to export flow records periodically at a given flow export frequency.
func (exp *flowExporter) CheckAndDoExport(collector net.Addr, pollDone chan struct{}) {
// Number of pollDone signals received or poll cycles should be equal to export frequency before starting the export cycle.
// This is necessary because IPFIX collector computes throughput based on flow records received interval.
<-pollDone
exp.pollCycle++
if exp.pollCycle%exp.exportFrequency == 0 {
if exp.process == nil {
err := exp.initFlowExporter(collector)
if err != nil {
klog.Errorf("Error when initializing flow exporter: %v", err)
return
// DoExport enables us to export flow records periodically at a given flow export frequency.
func (exp *flowExporter) Export(collector net.Addr, stopCh <-chan struct{}, pollDone <-chan struct{}) {
for {
select {
case <-stopCh:
return
case <-pollDone:
// Number of pollDone signals received or poll cycles should be equal to export frequency before starting
// the export cycle. This is necessary because IPFIX collector computes throughput based on flow records received interval.
exp.pollCycle++
if exp.pollCycle%exp.exportFrequency == 0 {
// Retry to connect to IPFIX collector if the exporting process gets reset
if exp.process == nil {
err := exp.initFlowExporter(collector)
if err != nil {
klog.Errorf("Error when initializing flow exporter: %v", err)
// There could be other errors while initializing flow exporter other than connecting to IPFIX collector,
// therefore closing the connection and resetting the process.
if exp.process != nil {
exp.process.CloseConnToCollector()
exp.process = nil
}
return
}
}
// Build and send flow records to IPFIX collector.
exp.flowRecords.BuildFlowRecords()
err := exp.sendFlowRecords()
if err != nil {
klog.Errorf("Error when sending flow records: %v", err)
// If there is an error when sending flow records because of intermittent connectivity, we reset the connection
// to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records.
exp.process.CloseConnToCollector()
exp.process = nil
return
}

exp.pollCycle = 0
klog.V(2).Infof("Successfully exported IPFIX flow records")
}
}
exp.flowRecords.BuildFlowRecords()
err := exp.sendFlowRecords()
if err != nil {
klog.Errorf("Error when sending flow records: %v", err)
// If there is an error when sending flow records because of intermittent connectivity, we reset the connection
// to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records.
exp.process.CloseConnToCollector()
exp.process = nil
}
exp.pollCycle = 0
klog.V(2).Infof("Successfully exported IPFIX flow records")
}

return
}

func (exp *flowExporter) initFlowExporter(collector net.Addr) error {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/flowexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestFlowExporter(t *testing.T) {

rc, collectorOutput, _, err := provider.RunCommandOnNode(masterNodeName(), fmt.Sprintf("kubectl logs ipfix-collector -n antrea-test"))
if err != nil || rc != 0 {
t.Fatalf("error when getting logs %v, rc: %v", err, rc)
t.Fatalf("Error when getting logs %v, rc: %v", err, rc)
}

/* Parse through IPFIX collector output. Sample output (with truncated fields) is given below:
Expand Down
17 changes: 1 addition & 16 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,11 @@ func (data *TestData) deployAntreaFlowExporter(ipfixCollector string) error {
}

antreaAgentConf, _ := configMap.Data["antrea-agent.conf"]
antreaAgentConf = strings.Replace(antreaAgentConf, "# FlowExporter: false", " FlowExporter: true", 1)
antreaAgentConf = strings.Replace(antreaAgentConf, "# FlowExporter: false", " FlowExporter: true", 1)
antreaAgentConf = strings.Replace(antreaAgentConf, "#flowCollectorAddr: \"\"", fmt.Sprintf("flowCollectorAddr: \"%s\"", ipfixCollector), 1)
antreaAgentConf = strings.Replace(antreaAgentConf, "#flowPollInterval: \"5s\"", "flowPollInterval: \"1s\"", 1)
antreaAgentConf = strings.Replace(antreaAgentConf, "#flowExportFrequency: 12", "flowExportFrequency: 5", 1)
configMap.Data["antrea-agent.conf"] = antreaAgentConf

if _, err := data.clientset.CoreV1().ConfigMaps(antreaNamespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update ConfigMap %s: %v", configMap.Name, err)
}
Expand All @@ -328,20 +327,6 @@ func (data *TestData) deployAntreaFlowExporter(ipfixCollector string) error {
return fmt.Errorf("error when restarting antrea-agent Pod: %v", err)
}

// Just to be safe disabling the FlowExporter feature for subsequent tests.
configMap, err = data.GetAntreaConfigMap(antreaNamespace)
if err != nil {
return fmt.Errorf("failed to get ConfigMap: %v", err)
}

antreaAgentConf, _ = configMap.Data["antrea-agent.conf"]
antreaAgentConf = strings.Replace(antreaAgentConf, " FlowExporter: true", " FlowExporter: false", 1)
configMap.Data["antrea-agent.conf"] = antreaAgentConf

if _, err := data.clientset.CoreV1().ConfigMaps(antreaNamespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update ConfigMap %s: %v", configMap.Name, err)
}

return nil
}

Expand Down

0 comments on commit c9d365d

Please sign in to comment.