Skip to content

Commit

Permalink
Addreseed the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
srikartati committed Aug 11, 2020
1 parent 841469c commit f1744e4
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 43 deletions.
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func run(o *Options) error {
flowExporter := exporter.NewFlowExporter(
flowrecords.NewFlowRecords(connStore),
o.config.FlowExportFrequency)
go wait.Until(func() { flowExporter.CheckAndDoExport(o.flowCollector, pollDone) }, o.pollInterval, stopCh)
go wait.Until(func() { flowExporter.Export(o.flowCollector, pollDone) }, 0, stopCh)
}

<-stopCh
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/flowexporter/connections/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl"
)

// InitializeConnTrackDumper initialize the ConnTrackDumper interface for different OS and datapath types.
// InitializeConnTrackDumper initializes the ConnTrackDumper interface for different OS and datapath types.
func InitializeConnTrackDumper(nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet, ovsctlClient ovsctl.OVSCtlClient, ovsDatapathType string) ConnTrackDumper {
var connTrackDumper ConnTrackDumper
if ovsDatapathType == ovsconfig.OVSDatapathSystem {
Expand Down Expand Up @@ -60,6 +60,7 @@ func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.Node
// Conntrack flows will be different for Pod-to-Service flows w/ Antrea-proxy. This implementation will be simpler, when the
// Antrea proxy is supported.
if serviceCIDR.Contains(srcIP) || serviceCIDR.Contains(dstIP) {
klog.V(4).Infof("Detected a flow with Cluster IP :%v", conn)
continue
}
filteredConns = append(filteredConns, conn)
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/flowexporter/connections/conntrack_ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (ct *connTrackOvsCtl) DumpFlows(zoneFilter uint16) ([]*flowexporter.Connect
}

filteredConns := filterAntreaConns(conns, ct.nodeConfig, ct.serviceCIDR, zoneFilter)
klog.V(2).Infof("Flow exporter considered flows: %d", len(filteredConns))
klog.V(2).Infof("FlowExporter considered flows: %d", len(filteredConns))

return filteredConns, nil
}
Expand All @@ -83,14 +83,14 @@ func (ct *connTrackOvsCtl) ovsAppctlDumpConnections(zoneFilter uint16) ([]*flowe
for _, flow := range outputFlow {
conn, err := flowStringToAntreaConnection(flow, zoneFilter)
if err != nil {
klog.Warningf("Ignoring the flow from conntrack dump due to the error: %v", err)
klog.V(4).Infof("Ignoring the flow from conntrack dump due to parsing error: %v", err)
continue
}
if conn != nil {
antreaConns = append(antreaConns, conn)
}
}
klog.V(2).Infof("Finished dumping -- total no. of flows in conntrack: %d", len(antreaConns))
klog.V(2).Infof("FlowExporter considered flows in conntrack: %d", len(antreaConns))
return antreaConns, nil
}

Expand Down
59 changes: 35 additions & 24 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,34 +91,45 @@ func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *fl
}
}

// CheckAndDoExport enables us to export flow records periodically at a given flow export frequency.
func (exp *flowExporter) CheckAndDoExport(collector net.Addr, pollDone chan struct{}) {
// Number of pollDone signals received or poll cycles should be equal to export frequency before starting the export cycle.
// This is necessary because IPFIX collector computes throughput based on flow records received interval.
<-pollDone
exp.pollCycle++
if exp.pollCycle%exp.exportFrequency == 0 {
if exp.process == nil {
err := exp.initFlowExporter(collector)
if err != nil {
klog.Errorf("Error when initializing flow exporter: %v", err)
return
// DoExport enables us to export flow records periodically at a given flow export frequency.
func (exp *flowExporter) Export(collector net.Addr, pollDone chan struct{}) {
for {
select {
case <-pollDone:
// Number of pollDone signals received or poll cycles should be equal to export frequency before starting
// the export cycle. This is necessary because IPFIX collector computes throughput based on flow records received interval.
exp.pollCycle++
if exp.pollCycle%exp.exportFrequency == 0 {
// Retry to connect to IPFIX collector if the exporting process gets reset
if exp.process == nil {
err := exp.initFlowExporter(collector)
if err != nil {
klog.Errorf("Error when initializing flow exporter: %v", err)
// There could be other errors while initializing flow exporter other than connecting to IPFIX collector,
// therefore closing the connection and resetting the process.
exp.process.CloseConnToCollector()
exp.process = nil
return
}
}
// Build and send flow records to IPFIX collector.
exp.flowRecords.BuildFlowRecords()
err := exp.sendFlowRecords()
if err != nil {
klog.Errorf("Error when sending flow records: %v", err)
// If there is an error when sending flow records because of intermittent connectivity, we reset the connection
// to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records.
exp.process.CloseConnToCollector()
exp.process = nil
return
}
exp.pollCycle = 0
klog.V(2).Infof("Successfully exported IPFIX flow records")
}
break
}
exp.flowRecords.BuildFlowRecords()
err := exp.sendFlowRecords()
if err != nil {
klog.Errorf("Error when sending flow records: %v", err)
// If there is an error when sending flow records because of intermittent connectivity, we reset the connection
// to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records.
exp.process.CloseConnToCollector()
exp.process = nil
}
exp.pollCycle = 0
klog.V(2).Infof("Successfully exported IPFIX flow records")
}

return
}

func (exp *flowExporter) initFlowExporter(collector net.Addr) error {
Expand Down
14 changes: 0 additions & 14 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,20 +328,6 @@ func (data *TestData) deployAntreaFlowExporter(ipfixCollector string) error {
return fmt.Errorf("error when restarting antrea-agent Pod: %v", err)
}

// Just to be safe disabling the FlowExporter feature for subsequent tests.
configMap, err = data.GetAntreaConfigMap(antreaNamespace)
if err != nil {
return fmt.Errorf("failed to get ConfigMap: %v", err)
}

antreaAgentConf, _ = configMap.Data["antrea-agent.conf"]
antreaAgentConf = strings.Replace(antreaAgentConf, " FlowExporter: true", " FlowExporter: false", 1)
configMap.Data["antrea-agent.conf"] = antreaAgentConf

if _, err := data.clientset.CoreV1().ConfigMaps(antreaNamespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update ConfigMap %s: %v", configMap.Name, err)
}

return nil
}

Expand Down

0 comments on commit f1744e4

Please sign in to comment.