diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 9a71ca0bbfd..68e4c8f65db 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -16,6 +16,7 @@ package main import ( "fmt" + "k8s.io/apimachinery/pkg/util/wait" "net" "time" @@ -243,19 +244,13 @@ func run(o *Options) error { connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, agentQuerier.GetOVSCtlClient(), o.config.OVSDatapathType), ifaceStore, o.pollInterval) - // pollDone helps in synchronizing connStore.Run and flowExporter.Run go routines. pollDone := make(chan struct{}) go connStore.Run(stopCh, pollDone) - flowExporter, err := exporter.InitFlowExporter( - o.flowCollector, + flowExporter := exporter.NewFlowExporter( flowrecords.NewFlowRecords(connStore), - o.config.FlowExportFrequency, - o.pollInterval) - if err != nil { - return fmt.Errorf("error when initializing flow exporter: %v", err) - } - go flowExporter.Run(stopCh, pollDone) + o.config.FlowExportFrequency) + go wait.Until(func() { flowExporter.CheckAndDoExport(o.flowCollector, pollDone) }, o.pollInterval, stopCh) } <-stopCh diff --git a/go.mod b/go.mod index 5367ad8cc61..5ac3f2b9c39 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/stretchr/testify v1.5.1 github.com/ti-mo/conntrack v0.3.0 github.com/vishvananda/netlink v1.1.0 - github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f + github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e diff --git a/go.sum b/go.sum index afa512e96d8..87e41ab24f3 100644 --- a/go.sum +++ b/go.sum @@ -384,8 +384,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= -github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f h1:XyyczLRk8+6YqYXE8v20XjbVtK415KR114IrjX9THpQ= -github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU= +github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc h1:lytkY3WfWgOyyaOlgj/3Y5Fkwc9ENff2qg6Ul4FYriE= +github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU= github.com/wenyingd/ofnet v0.0.0-20200609044910-a72f3e66744e h1:NM4NTe6Z+mF5IYlYAiEdRlY8XcMY4P6VlYqgsBhpojQ= github.com/wenyingd/ofnet v0.0.0-20200609044910-a72f3e66744e/go.mod h1:+g6SfqhTVqeGEmUJ0l4WtCgsL4dflTUJE4k+TPCKqXo= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index 30c2a01811c..f09c325cd22 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -62,10 +62,9 @@ func (cs *ConnectionStore) Run(stopCh <-chan struct{}, pollDone chan struct{}) { klog.Errorf("Error during conntrack poll cycle: %v", err) } // We need synchronization between ConnectionStore.Run and FlowExporter.Run go routines. - // ConnectionStore.Run (connection poll) should be done to start FlowExporter.Run (connection export); pollDone signals helps enabling this. + // ConnectionStore.Run (connection poll) should be done to start FlowExporter.Run (connection export); pollDone signal helps enabling this. // FlowExporter.Run should be done to start ConnectionStore.Run; mutex on connection map object makes sure of this synchronization guarantee. pollDone <- struct{}{} - } } } @@ -98,7 +97,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) { if !srcFound && !dstFound { klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", conn.TupleOrig.SourceAddress.String(), conn.TupleReply.SourceAddress.String()) } - // sourceIP/destinationIP are mapped only to local pods and not remote pods. + // sourceIP/destinationIP are mapped only to local Pods and not remote Pods. if srcFound && sIface.Type == interfacestore.ContainerInterface { conn.SourcePodName = sIface.ContainerInterfaceConfig.PodName conn.SourcePodNamespace = sIface.ContainerInterfaceConfig.PodNamespace @@ -107,7 +106,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) { conn.DestinationPodName = dIface.ContainerInterfaceConfig.PodName conn.DestinationPodNamespace = dIface.ContainerInterfaceConfig.PodNamespace } - // Do not export flow records of connections whose destination is local pod and source is remote pod. + // Do not export flow records of connections whose destination is local Pod and source is remote Pod. // We export flow records only form "source node", where the connection is originated from. This is to avoid // 2 copies of flow records at flow collector. This restriction will be removed when flow records store network policy rule ID. // TODO: Remove this when network policy rule ID are added to flow records. diff --git a/pkg/agent/flowexporter/connections/conntrack.go b/pkg/agent/flowexporter/connections/conntrack.go index ec28f20c0df..593836c33ed 100644 --- a/pkg/agent/flowexporter/connections/conntrack.go +++ b/pkg/agent/flowexporter/connections/conntrack.go @@ -25,6 +25,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl" ) +// InitializeConnTrackDumper initialize the ConnTrackDumper interface for different OS and datapath types. func InitializeConnTrackDumper(nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet, ovsctlClient ovsctl.OVSCtlClient, ovsDatapathType string) ConnTrackDumper { var connTrackDumper ConnTrackDumper if ovsDatapathType == ovsconfig.OVSDatapathSystem { diff --git a/pkg/agent/flowexporter/connections/conntrack_linux.go b/pkg/agent/flowexporter/connections/conntrack_linux.go index d6b7c2fac20..f3e238bf5b4 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux.go @@ -38,12 +38,10 @@ type connTrackSystem struct { func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet) *connTrackSystem { // Ensure net.netfilter.nf_conntrack_acct value to be 1. This will enable flow exporter to export stats of connections. - // Do not handle error and continue with creation of interfacer object as we can still dump flows with no stats. - // If log says permission error, please ensure net.netfilter.nf_conntrack_acct to be set to 1. + // Do not fail, but continue after logging error as we can still dump flows with no stats. sysctl.EnsureSysctlNetValue("netfilter/nf_conntrack_acct", 1) // Ensure net.netfilter.nf_conntrack_timestamp value to be 1. This will enable flow exporter to export timestamps of connections. - // Do not handle error and continue with creation of interfacer object as we can still dump flows with no timestamps. - // If log says permission error, please ensure net.netfilter.nf_conntrack_timestamp to be set to 1. + // Do not fail, but continue after logging error as we can still dump flows with no timestamps. sysctl.EnsureSysctlNetValue("netfilter/nf_conntrack_timestamp", 1) return &connTrackSystem{ @@ -103,7 +101,7 @@ func (nfct *netFilterConnTrack) DumpFilter(filter conntrack.Filter) ([]*flowexpo } antreaConns := make([]*flowexporter.Connection, len(conns)) for i, conn := range conns { - antreaConns[i] = createAntreaConn(&conn) + antreaConns[i] = netlinkFlowToAntreaConnection(&conn) } klog.V(2).Infof("Finished dumping -- total no. of flows in conntrack: %d", len(antreaConns)) @@ -112,7 +110,7 @@ func (nfct *netFilterConnTrack) DumpFilter(filter conntrack.Filter) ([]*flowexpo return antreaConns, nil } -func createAntreaConn(conn *conntrack.Flow) *flowexporter.Connection { +func netlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connection { tupleOrig := flowexporter.Tuple{ SourceAddress: conn.TupleOrig.IP.SourceAddress, DestinationAddress: conn.TupleOrig.IP.DestinationAddress, diff --git a/pkg/agent/flowexporter/connections/conntrack_ovs.go b/pkg/agent/flowexporter/connections/conntrack_ovs.go index 99722fd07da..38cd6be2c0d 100644 --- a/pkg/agent/flowexporter/connections/conntrack_ovs.go +++ b/pkg/agent/flowexporter/connections/conntrack_ovs.go @@ -25,9 +25,17 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/config" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" "github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl" - "github.com/vmware-tanzu/antrea/pkg/util/ip" ) +// Following map is for converting protocol name (string) to protocol identifier +var protocols = map[string]uint8{ + "icmp": 1, + "igmp": 2, + "tcp": 6, + "udp": 17, + "ipv6-icmp": 58, +} + // connTrackOvsCtl implements ConnTrackDumper. This supports OVS userspace datapath scenarios. var _ ConnTrackDumper = new(connTrackOvsCtl) @@ -69,106 +77,127 @@ func (ct *connTrackOvsCtl) ovsAppctlDumpConnections(zoneFilter uint16) ([]*flowe return nil, fmt.Errorf("error when executing dump-conntrack command: %v", execErr) } - // Parse the output to get the flows + // Parse the output to get the flow strings and convert them to Antrea connections. antreaConns := make([]*flowexporter.Connection, 0) outputFlow := strings.Split(string(cmdOutput), "\n") - var err error for _, flow := range outputFlow { - conn := flowexporter.Connection{} - flowSlice := strings.Split(flow, ",") - isReply := false - inZone := false - for _, fs := range flowSlice { - // Indicator to populate reply or reverse fields - if strings.Contains(fs, "reply") { - isReply = true - } - if !strings.Contains(fs, "=") { - // Proto identifier - conn.TupleOrig.Protocol, err = ip.LookupProtocolMap(fs) - if err != nil { - klog.Errorf("Unknown protocol to convert to ID: %s", fs) - continue - } - conn.TupleReply.Protocol = conn.TupleOrig.Protocol - } else if strings.Contains(fs, "src") { - fields := strings.Split(fs, "=") - if !isReply { - conn.TupleOrig.SourceAddress = net.ParseIP(fields[len(fields)-1]) - } else { - conn.TupleReply.SourceAddress = net.ParseIP(fields[len(fields)-1]) - } - } else if strings.Contains(fs, "dst") { - fields := strings.Split(fs, "=") - if !isReply { - conn.TupleOrig.DestinationAddress = net.ParseIP(fields[len(fields)-1]) - } else { - conn.TupleReply.DestinationAddress = net.ParseIP(fields[len(fields)-1]) - } - } else if strings.Contains(fs, "sport") { - fields := strings.Split(fs, "=") - val, err := strconv.Atoi(fields[len(fields)-1]) - if err != nil { - klog.Errorf("Conversion of sport: %s to int failed", fields[len(fields)-1]) - continue - } - if !isReply { - conn.TupleOrig.SourcePort = uint16(val) - } else { - conn.TupleReply.SourcePort = uint16(val) - } - } else if strings.Contains(fs, "dport") { - // dport field could be the last tuple field in ovs-dpctl output format. - fs = strings.TrimSuffix(fs, ")") - - fields := strings.Split(fs, "=") - val, err := strconv.Atoi(fields[len(fields)-1]) - if err != nil { - klog.Errorf("Conversion of dport: %s to int failed", fields[len(fields)-1]) - continue - } - if !isReply { - conn.TupleOrig.DestinationPort = uint16(val) - } else { - conn.TupleReply.DestinationPort = uint16(val) - } - } else if strings.Contains(fs, "zone") { - fields := strings.Split(fs, "=") - val, err := strconv.Atoi(fields[len(fields)-1]) - if err != nil { - klog.Errorf("Conversion of zone: %s to int failed", fields[len(fields)-1]) - continue - } - if zoneFilter != uint16(val) { - break - } else { - inZone = true - conn.Zone = uint16(val) - } - } else if strings.Contains(fs, "timeout") { - fields := strings.Split(fs, "=") - val, err := strconv.Atoi(fields[len(fields)-1]) - if err != nil { - klog.Errorf("Conversion of timeout: %s to int failed", fields[len(fields)-1]) - continue - } - conn.Timeout = uint32(val) - } else if strings.Contains(fs, "id") { - fields := strings.Split(fs, "=") - val, err := strconv.Atoi(fields[len(fields)-1]) - if err != nil { - klog.Errorf("Conversion of id: %s to int failed", fields[len(fields)-1]) - continue - } - conn.ID = uint32(val) - } + conn, err := flowStringToAntreaConnection(flow, zoneFilter) + if err != nil { + klog.Warningf("Ignoring the flow from conntrack dump due to the error: %v", err) + continue } - if inZone { - conn.IsActive = true - conn.DoExport = true - antreaConns = append(antreaConns, &conn) + if conn != nil { + antreaConns = append(antreaConns, conn) } } klog.V(2).Infof("Finished dumping -- total no. of flows in conntrack: %d", len(antreaConns)) return antreaConns, nil } + +// flowStringToAntreaConnection parses the flow string and converts to Antrea connection. +// Example of flow string: +// tcp,orig=(src=10.10.1.2,dst=10.96.0.1,sport=42540,dport=443),reply=(src=10.96.0.1,dst=10.10.1.2,sport=443,dport=42540),zone=65520,protoinfo=(state=TIME_WAIT) +func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter.Connection, error) { + conn := flowexporter.Connection{} + flowSlice := strings.Split(flow, ",") + isReply := false + inZone := false + var err error + for _, fs := range flowSlice { + // Indicator to populate reply or reverse fields + if strings.Contains(fs, "reply") { + isReply = true + } + if !strings.Contains(fs, "=") { + // Proto identifier + conn.TupleOrig.Protocol, err = lookupProtocolMap(fs) + if err != nil { + return nil, err + } + conn.TupleReply.Protocol = conn.TupleOrig.Protocol + } else if strings.Contains(fs, "src") { + fields := strings.Split(fs, "=") + if !isReply { + conn.TupleOrig.SourceAddress = net.ParseIP(fields[len(fields)-1]) + } else { + conn.TupleReply.SourceAddress = net.ParseIP(fields[len(fields)-1]) + } + } else if strings.Contains(fs, "dst") { + fields := strings.Split(fs, "=") + if !isReply { + conn.TupleOrig.DestinationAddress = net.ParseIP(fields[len(fields)-1]) + } else { + conn.TupleReply.DestinationAddress = net.ParseIP(fields[len(fields)-1]) + } + } else if strings.Contains(fs, "sport") { + fields := strings.Split(fs, "=") + val, err := strconv.Atoi(fields[len(fields)-1]) + if err != nil { + return nil, fmt.Errorf("conversion of sport %s to int failed", fields[len(fields)-1]) + } + if !isReply { + conn.TupleOrig.SourcePort = uint16(val) + } else { + conn.TupleReply.SourcePort = uint16(val) + } + } else if strings.Contains(fs, "dport") { + // dport field could be the last tuple field in ovs-dpctl output format. + fs = strings.TrimSuffix(fs, ")") + + fields := strings.Split(fs, "=") + val, err := strconv.Atoi(fields[len(fields)-1]) + if err != nil { + return nil, fmt.Errorf("conversion of dport %s to int failed", fields[len(fields)-1]) + } + if !isReply { + conn.TupleOrig.DestinationPort = uint16(val) + } else { + conn.TupleReply.DestinationPort = uint16(val) + } + } else if strings.Contains(fs, "zone") { + fields := strings.Split(fs, "=") + val, err := strconv.Atoi(fields[len(fields)-1]) + if err != nil { + return nil, fmt.Errorf("conversion of zone %s to int failed", fields[len(fields)-1]) + } + if zoneFilter != uint16(val) { + break + } else { + inZone = true + conn.Zone = uint16(val) + } + } else if strings.Contains(fs, "timeout") { + fields := strings.Split(fs, "=") + val, err := strconv.Atoi(fields[len(fields)-1]) + if err != nil { + return nil, fmt.Errorf("conversion of timeout %s to int failed", fields[len(fields)-1]) + } + conn.Timeout = uint32(val) + } else if strings.Contains(fs, "id") { + fields := strings.Split(fs, "=") + val, err := strconv.Atoi(fields[len(fields)-1]) + if err != nil { + return nil, fmt.Errorf("conversion of id %s to int failed", fields[len(fields)-1]) + } + conn.ID = uint32(val) + } + } + if !inZone { + return nil, nil + } + conn.IsActive = true + conn.DoExport = true + + return &conn, nil +} + +// lookupProtocolMap returns protocol identifier given protocol name +func lookupProtocolMap(name string) (uint8, error) { + name = strings.TrimSpace(name) + lowerCaseStr := strings.ToLower(name) + proto, found := protocols[lowerCaseStr] + if !found { + return 0, fmt.Errorf("unknown IP protocol specified: %s", name) + } + return proto, nil +} diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 3448ac01b52..612189bdf62 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -19,7 +19,6 @@ import ( "hash/fnv" "net" "strings" - "time" "unicode" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" @@ -67,7 +66,7 @@ type flowExporter struct { process ipfix.IPFIXExportingProcess elementsList []*ipfixentities.InfoElement exportFrequency uint - pollInterval time.Duration + pollCycle uint templateID uint16 } @@ -81,22 +80,52 @@ func genObservationID() (uint32, error) { return h.Sum32(), nil } -func NewFlowExporter(records *flowrecords.FlowRecords, expProcess ipfix.IPFIXExportingProcess, elemList []*ipfixentities.InfoElement, exportFrequency uint, pollInterval time.Duration, tempID uint16) *flowExporter { +func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *flowExporter { return &flowExporter{ records, - expProcess, - elemList, + nil, + nil, exportFrequency, - pollInterval, - tempID, + 0, + 0, } } -func InitFlowExporter(collector net.Addr, records *flowrecords.FlowRecords, exportFrequency uint, pollInterval time.Duration) (*flowExporter, error) { - // Create IPFIX exporting expProcess and initialize registries and other related entities +// CheckAndDoExport enables us to export flow records periodically at a given flow export frequency. +func (exp *flowExporter) CheckAndDoExport(collector net.Addr, pollDone chan struct{}) { + // Number of pollDone signals received or poll cycles should be equal to export frequency before starting the export cycle. + // This is necessary because IPFIX collector computes throughput based on flow records received interval. + <-pollDone + exp.pollCycle++ + if exp.pollCycle%exp.exportFrequency == 0 { + if exp.process == nil { + err := exp.initFlowExporter(collector) + if err != nil { + klog.Errorf("Error when initializing flow exporter: %v", err) + return + } + } + exp.flowRecords.BuildFlowRecords() + err := exp.sendFlowRecords() + if err != nil { + klog.Errorf("Error when sending flow records: %v", err) + // If there is an error when sending flow records because of intermittent connectivity, we reset the connection + // to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records. + exp.process.CloseConnToCollector() + exp.process = nil + } + exp.pollCycle = 0 + klog.V(2).Infof("Successfully exported IPFIX flow records") + } + + return +} + +func (exp *flowExporter) initFlowExporter(collector net.Addr) error { + // Create IPFIX exporting expProcess, initialize registries and other related entities obsID, err := genObservationID() if err != nil { - return nil, fmt.Errorf("cannot generate obsID for IPFIX ipfixexport: %v", err) + return fmt.Errorf("cannot generate obsID for IPFIX ipfixexport: %v", err) } var expProcess ipfix.IPFIXExportingProcess @@ -108,55 +137,21 @@ func InitFlowExporter(collector net.Addr, records *flowrecords.FlowRecords, expo expProcess, err = ipfix.NewIPFIXExportingProcess(collector, obsID, 1800) } if err != nil { - return nil, fmt.Errorf("error while initializing IPFIX exporting expProcess: %v", err) + return err } - expProcess.LoadRegistries() - - flowExp := NewFlowExporter(records, expProcess, nil, exportFrequency, pollInterval, expProcess.NewTemplateID()) + exp.process = expProcess + exp.templateID = expProcess.NewTemplateID() - templateRec := ipfix.NewIPFIXTemplateRecord(uint16(len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements)), flowExp.templateID) + expProcess.LoadRegistries() + templateRec := ipfix.NewIPFIXTemplateRecord(uint16(len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements)), exp.templateID) - sentBytes, err := flowExp.sendTemplateRecord(templateRec) + sentBytes, err := exp.sendTemplateRecord(templateRec) if err != nil { - return nil, fmt.Errorf("error while creating and sending template record through IPFIX process: %v", err) + return err } klog.V(2).Infof("Initialized flow exporter and sent %d bytes size of template record", sentBytes) - return flowExp, nil -} - -// Run enables to export flow records periodically at a given flow export frequency -func (exp *flowExporter) Run(stopCh <-chan struct{}, pollDone <-chan struct{}) { - klog.Infof("Start exporting IPFIX flow records") - ticker := time.NewTicker(time.Duration(exp.exportFrequency) * exp.pollInterval) - defer ticker.Stop() - - for { - select { - case <-stopCh: - exp.process.CloseConnToCollector() - break - case <-ticker.C: - // Waiting for expected number of pollDone signals from go routine(ConnectionStore.Run) is necessary because - // IPFIX collector computes throughput based on flow records received interval. Number of pollDone - // signals should be equal to export frequency before starting the export cycle. - for i := uint(0); i < exp.exportFrequency; i++ { - <-pollDone - } - err := exp.flowRecords.BuildFlowRecords() - if err != nil { - klog.Errorf("Error when building flow records: %v", err) - exp.process.CloseConnToCollector() - break - } - err = exp.sendFlowRecords() - if err != nil { - klog.Errorf("Error when sending flow records: %v", err) - exp.process.CloseConnToCollector() - break - } - } - } + return nil } func (exp *flowExporter) sendFlowRecords() error { diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 5d5f6516087..92ae0283d30 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -30,7 +30,6 @@ import ( const ( testTemplateID = 256 - testFlowPollInterval = time.Second testFlowExportFrequency = 12 antreaEnterpriseRegistry = 29305 ) @@ -46,7 +45,7 @@ func TestFlowExporter_sendTemplateRecord(t *testing.T) { mockIPFIXExpProc, nil, testFlowExportFrequency, - testFlowPollInterval, + 0, testTemplateID, } // Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals) @@ -154,7 +153,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { mockIPFIXExpProc, elemList, testFlowExportFrequency, - testFlowPollInterval, + 0, testTemplateID, } // Expect calls required diff --git a/pkg/agent/flowexporter/flowrecords/flow_records.go b/pkg/agent/flowexporter/flowrecords/flow_records.go index fc2f4970c77..cfc168a476f 100644 --- a/pkg/agent/flowexporter/flowrecords/flow_records.go +++ b/pkg/agent/flowexporter/flowrecords/flow_records.go @@ -15,8 +15,6 @@ package flowrecords import ( - "fmt" - "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" @@ -38,10 +36,8 @@ func NewFlowRecords(connStore *connections.ConnectionStore) *FlowRecords { // BuildFlowRecords builds the flow record map from connection map in connection store func (fr *FlowRecords) BuildFlowRecords() error { - err := fr.connStore.ForAllConnectionsDo(fr.addOrUpdateFlowRecord) - if err != nil { - return fmt.Errorf("error when iterating connection map: %v", err) - } + // fr.addOrUpdateFlowRecord method does not return any error, hence no error handling required. + fr.connStore.ForAllConnectionsDo(fr.addOrUpdateFlowRecord) klog.V(2).Infof("No. of flow records built: %d", len(fr.recordsMap)) return nil } diff --git a/pkg/util/ip/ip.go b/pkg/util/ip/ip.go index 3a059c03bd4..e8e658d5be0 100644 --- a/pkg/util/ip/ip.go +++ b/pkg/util/ip/ip.go @@ -19,7 +19,6 @@ import ( "fmt" "net" "sort" - "strings" "github.com/vmware-tanzu/antrea/pkg/apis/networking/v1beta1" ) @@ -29,15 +28,6 @@ const ( v6BitLen = 8 * net.IPv6len ) -// Following map is for converting protocol name (string) to protocol identifier -var protocols = map[string]uint8{ - "icmp": 1, - "igmp": 2, - "tcp": 6, - "udp": 17, - "ipv6-icmp": 58, -} - // This function takes in one allow CIDR and multiple except CIDRs and gives diff CIDRs // in allowCIDR eliminating except CIDRs. It currently supports only IPv4. except CIDR input // can be changed. @@ -156,14 +146,3 @@ func NetIPNetToIPNet(ipNet *net.IPNet) *v1beta1.IPNet { prefix, _ := ipNet.Mask.Size() return &v1beta1.IPNet{IP: v1beta1.IPAddress(ipNet.IP), PrefixLength: int32(prefix)} } - -// LookupProtocolMap return protocol identifier given protocol name -func LookupProtocolMap(name string) (uint8, error) { - name = strings.TrimSpace(name) - lowerCaseStr := strings.ToLower(name) - proto, found := protocols[lowerCaseStr] - if !found { - return 0, fmt.Errorf("unknown IP protocol specified: %s", name) - } - return proto, nil -} diff --git a/plugins/octant/go.sum b/plugins/octant/go.sum index 382954524dc..8eb824573cb 100644 --- a/plugins/octant/go.sum +++ b/plugins/octant/go.sum @@ -469,7 +469,7 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vmware-tanzu/octant v0.13.1 h1:hz4JDnAA7xDkFjF4VEbt5SrSRrG26FCxKXXBGapf6Nc= github.com/vmware-tanzu/octant v0.13.1/go.mod h1:4q+wrV4tmUwAdMjvYOujSTtZbE4+zm0n5mb7FjvN0I0= -github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU= +github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU= github.com/wenyingd/ofnet v0.0.0-20200601065543-2c7a62482f16/go.mod h1:+g6SfqhTVqeGEmUJ0l4WtCgsL4dflTUJE4k+TPCKqXo= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 93186622699..83f945138a3 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -328,6 +328,20 @@ func (data *TestData) deployAntreaFlowExporter(ipfixCollector string) error { return fmt.Errorf("error when restarting antrea-agent Pod: %v", err) } + // Just to be safe disabling the FlowExporter feature for subsequent tests. + configMap, err = data.GetAntreaConfigMap(antreaNamespace) + if err != nil { + return fmt.Errorf("failed to get ConfigMap: %v", err) + } + + antreaAgentConf, _ = configMap.Data["antrea-agent.conf"] + antreaAgentConf = strings.Replace(antreaAgentConf, " FlowExporter: true", " FlowExporter: false", 1) + configMap.Data["antrea-agent.conf"] = antreaAgentConf + + if _, err := data.clientset.CoreV1().ConfigMaps(antreaNamespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update ConfigMap %s: %v", configMap.Name, err) + } + return nil }