Skip to content

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
Major change is handling of error in exporter go routine.
  • Loading branch information
srikartati committed Aug 10, 2020
1 parent 719f8e0 commit e90d1b2
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 200 deletions.
13 changes: 4 additions & 9 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"fmt"
"k8s.io/apimachinery/pkg/util/wait"
"net"
"time"

Expand Down Expand Up @@ -243,19 +244,13 @@ func run(o *Options) error {
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, agentQuerier.GetOVSCtlClient(), o.config.OVSDatapathType),
ifaceStore,
o.pollInterval)
// pollDone helps in synchronizing connStore.Run and flowExporter.Run go routines.
pollDone := make(chan struct{})
go connStore.Run(stopCh, pollDone)

flowExporter, err := exporter.InitFlowExporter(
o.flowCollector,
flowExporter := exporter.NewFlowExporter(
flowrecords.NewFlowRecords(connStore),
o.config.FlowExportFrequency,
o.pollInterval)
if err != nil {
return fmt.Errorf("error when initializing flow exporter: %v", err)
}
go flowExporter.Run(stopCh, pollDone)
o.config.FlowExportFrequency)
go wait.Until(func() { flowExporter.CheckAndDoExport(o.flowCollector, pollDone) }, o.pollInterval, stopCh)
}

<-stopCh
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/stretchr/testify v1.5.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f
github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f h1:XyyczLRk8+6YqYXE8v20XjbVtK415KR114IrjX9THpQ=
github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc h1:lytkY3WfWgOyyaOlgj/3Y5Fkwc9ENff2qg6Ul4FYriE=
github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20200609044910-a72f3e66744e h1:NM4NTe6Z+mF5IYlYAiEdRlY8XcMY4P6VlYqgsBhpojQ=
github.com/wenyingd/ofnet v0.0.0-20200609044910-a72f3e66744e/go.mod h1:+g6SfqhTVqeGEmUJ0l4WtCgsL4dflTUJE4k+TPCKqXo=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
7 changes: 3 additions & 4 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ func (cs *ConnectionStore) Run(stopCh <-chan struct{}, pollDone chan struct{}) {
klog.Errorf("Error during conntrack poll cycle: %v", err)
}
// We need synchronization between ConnectionStore.Run and FlowExporter.Run go routines.
// ConnectionStore.Run (connection poll) should be done to start FlowExporter.Run (connection export); pollDone signals helps enabling this.
// ConnectionStore.Run (connection poll) should be done to start FlowExporter.Run (connection export); pollDone signal helps enabling this.
// FlowExporter.Run should be done to start ConnectionStore.Run; mutex on connection map object makes sure of this synchronization guarantee.
pollDone <- struct{}{}

}
}
}
Expand Down Expand Up @@ -98,7 +97,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
if !srcFound && !dstFound {
klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", conn.TupleOrig.SourceAddress.String(), conn.TupleReply.SourceAddress.String())
}
// sourceIP/destinationIP are mapped only to local pods and not remote pods.
// sourceIP/destinationIP are mapped only to local Pods and not remote Pods.
if srcFound && sIface.Type == interfacestore.ContainerInterface {
conn.SourcePodName = sIface.ContainerInterfaceConfig.PodName
conn.SourcePodNamespace = sIface.ContainerInterfaceConfig.PodNamespace
Expand All @@ -107,7 +106,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
conn.DestinationPodName = dIface.ContainerInterfaceConfig.PodName
conn.DestinationPodNamespace = dIface.ContainerInterfaceConfig.PodNamespace
}
// Do not export flow records of connections whose destination is local pod and source is remote pod.
// Do not export flow records of connections whose destination is local Pod and source is remote Pod.
// We export flow records only form "source node", where the connection is originated from. This is to avoid
// 2 copies of flow records at flow collector. This restriction will be removed when flow records store network policy rule ID.
// TODO: Remove this when network policy rule ID are added to flow records.
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/connections/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl"
)

// InitializeConnTrackDumper initialize the ConnTrackDumper interface for different OS and datapath types.
func InitializeConnTrackDumper(nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet, ovsctlClient ovsctl.OVSCtlClient, ovsDatapathType string) ConnTrackDumper {
var connTrackDumper ConnTrackDumper
if ovsDatapathType == ovsconfig.OVSDatapathSystem {
Expand Down
10 changes: 4 additions & 6 deletions pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ type connTrackSystem struct {

func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet) *connTrackSystem {
// Ensure net.netfilter.nf_conntrack_acct value to be 1. This will enable flow exporter to export stats of connections.
// Do not handle error and continue with creation of interfacer object as we can still dump flows with no stats.
// If log says permission error, please ensure net.netfilter.nf_conntrack_acct to be set to 1.
// Do not fail, but continue after logging error as we can still dump flows with no stats.
sysctl.EnsureSysctlNetValue("netfilter/nf_conntrack_acct", 1)
// Ensure net.netfilter.nf_conntrack_timestamp value to be 1. This will enable flow exporter to export timestamps of connections.
// Do not handle error and continue with creation of interfacer object as we can still dump flows with no timestamps.
// If log says permission error, please ensure net.netfilter.nf_conntrack_timestamp to be set to 1.
// Do not fail, but continue after logging error as we can still dump flows with no timestamps.
sysctl.EnsureSysctlNetValue("netfilter/nf_conntrack_timestamp", 1)

return &connTrackSystem{
Expand Down Expand Up @@ -103,7 +101,7 @@ func (nfct *netFilterConnTrack) DumpFilter(filter conntrack.Filter) ([]*flowexpo
}
antreaConns := make([]*flowexporter.Connection, len(conns))
for i, conn := range conns {
antreaConns[i] = createAntreaConn(&conn)
antreaConns[i] = netlinkFlowToAntreaConnection(&conn)
}

klog.V(2).Infof("Finished dumping -- total no. of flows in conntrack: %d", len(antreaConns))
Expand All @@ -112,7 +110,7 @@ func (nfct *netFilterConnTrack) DumpFilter(filter conntrack.Filter) ([]*flowexpo
return antreaConns, nil
}

func createAntreaConn(conn *conntrack.Flow) *flowexporter.Connection {
func netlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connection {
tupleOrig := flowexporter.Tuple{
SourceAddress: conn.TupleOrig.IP.SourceAddress,
DestinationAddress: conn.TupleOrig.IP.DestinationAddress,
Expand Down
219 changes: 124 additions & 95 deletions pkg/agent/flowexporter/connections/conntrack_ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/config"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl"
"github.com/vmware-tanzu/antrea/pkg/util/ip"
)

// Following map is for converting protocol name (string) to protocol identifier
var protocols = map[string]uint8{
"icmp": 1,
"igmp": 2,
"tcp": 6,
"udp": 17,
"ipv6-icmp": 58,
}

// connTrackOvsCtl implements ConnTrackDumper. This supports OVS userspace datapath scenarios.
var _ ConnTrackDumper = new(connTrackOvsCtl)

Expand Down Expand Up @@ -69,106 +77,127 @@ func (ct *connTrackOvsCtl) ovsAppctlDumpConnections(zoneFilter uint16) ([]*flowe
return nil, fmt.Errorf("error when executing dump-conntrack command: %v", execErr)
}

// Parse the output to get the flows
// Parse the output to get the flow strings and convert them to Antrea connections.
antreaConns := make([]*flowexporter.Connection, 0)
outputFlow := strings.Split(string(cmdOutput), "\n")
var err error
for _, flow := range outputFlow {
conn := flowexporter.Connection{}
flowSlice := strings.Split(flow, ",")
isReply := false
inZone := false
for _, fs := range flowSlice {
// Indicator to populate reply or reverse fields
if strings.Contains(fs, "reply") {
isReply = true
}
if !strings.Contains(fs, "=") {
// Proto identifier
conn.TupleOrig.Protocol, err = ip.LookupProtocolMap(fs)
if err != nil {
klog.Errorf("Unknown protocol to convert to ID: %s", fs)
continue
}
conn.TupleReply.Protocol = conn.TupleOrig.Protocol
} else if strings.Contains(fs, "src") {
fields := strings.Split(fs, "=")
if !isReply {
conn.TupleOrig.SourceAddress = net.ParseIP(fields[len(fields)-1])
} else {
conn.TupleReply.SourceAddress = net.ParseIP(fields[len(fields)-1])
}
} else if strings.Contains(fs, "dst") {
fields := strings.Split(fs, "=")
if !isReply {
conn.TupleOrig.DestinationAddress = net.ParseIP(fields[len(fields)-1])
} else {
conn.TupleReply.DestinationAddress = net.ParseIP(fields[len(fields)-1])
}
} else if strings.Contains(fs, "sport") {
fields := strings.Split(fs, "=")
val, err := strconv.Atoi(fields[len(fields)-1])
if err != nil {
klog.Errorf("Conversion of sport: %s to int failed", fields[len(fields)-1])
continue
}
if !isReply {
conn.TupleOrig.SourcePort = uint16(val)
} else {
conn.TupleReply.SourcePort = uint16(val)
}
} else if strings.Contains(fs, "dport") {
// dport field could be the last tuple field in ovs-dpctl output format.
fs = strings.TrimSuffix(fs, ")")

fields := strings.Split(fs, "=")
val, err := strconv.Atoi(fields[len(fields)-1])
if err != nil {
klog.Errorf("Conversion of dport: %s to int failed", fields[len(fields)-1])
continue
}
if !isReply {
conn.TupleOrig.DestinationPort = uint16(val)
} else {
conn.TupleReply.DestinationPort = uint16(val)
}
} else if strings.Contains(fs, "zone") {
fields := strings.Split(fs, "=")
val, err := strconv.Atoi(fields[len(fields)-1])
if err != nil {
klog.Errorf("Conversion of zone: %s to int failed", fields[len(fields)-1])
continue
}
if zoneFilter != uint16(val) {
break
} else {
inZone = true
conn.Zone = uint16(val)
}
} else if strings.Contains(fs, "timeout") {
fields := strings.Split(fs, "=")
val, err := strconv.Atoi(fields[len(fields)-1])
if err != nil {
klog.Errorf("Conversion of timeout: %s to int failed", fields[len(fields)-1])
continue
}
conn.Timeout = uint32(val)
} else if strings.Contains(fs, "id") {
fields := strings.Split(fs, "=")
val, err := strconv.Atoi(fields[len(fields)-1])
if err != nil {
klog.Errorf("Conversion of id: %s to int failed", fields[len(fields)-1])
continue
}
conn.ID = uint32(val)
}
conn, err := flowStringToAntreaConnection(flow, zoneFilter)
if err != nil {
klog.Warningf("Ignoring the flow from conntrack dump due to the error: %v", err)
continue
}
if inZone {
conn.IsActive = true
conn.DoExport = true
antreaConns = append(antreaConns, &conn)
if conn != nil {
antreaConns = append(antreaConns, conn)
}
}
klog.V(2).Infof("Finished dumping -- total no. of flows in conntrack: %d", len(antreaConns))
return antreaConns, nil
}

// flowStringToAntreaConnection parses the flow string and converts to Antrea connection.
// Example of flow string:
// tcp,orig=(src=10.10.1.2,dst=10.96.0.1,sport=42540,dport=443),reply=(src=10.96.0.1,dst=10.10.1.2,sport=443,dport=42540),zone=65520,protoinfo=(state=TIME_WAIT)
func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter.Connection, error) {
conn := flowexporter.Connection{}
flowSlice := strings.Split(flow, ",")
isReply := false
inZone := false
var err error
for _, fs := range flowSlice {
// Indicator to populate reply or reverse fields
if strings.Contains(fs, "reply") {
isReply = true
}
if !strings.Contains(fs, "=") {
// Proto identifier
conn.TupleOrig.Protocol, err = lookupProtocolMap(fs)
if err != nil {
return nil, err
}
conn.TupleReply.Protocol = conn.TupleOrig.Protocol
} else if strings.Contains(fs, "src") {
fields := strings.Split(fs, "=")
if !isReply {
conn.TupleOrig.SourceAddress = net.ParseIP(fields[len(fields)-1])
} else {
conn.TupleReply.SourceAddress = net.ParseIP(fields[len(fields)-1])
}
} else if strings.Contains(fs, "dst") {
fields := strings.Split(fs, "=")
if !isReply {
conn.TupleOrig.DestinationAddress = net.ParseIP(fields[len(fields)-1])
} else {
conn.TupleReply.DestinationAddress = net.ParseIP(fields[len(fields)-1])
}
} else if strings.Contains(fs, "sport") {
fields := strings.Split(fs, "=")
val, err := strconv.Atoi(fields[len(fields)-1])
if err != nil {
return nil, fmt.Errorf("conversion of sport %s to int failed", fields[len(fields)-1])
}
if !isReply {
conn.TupleOrig.SourcePort = uint16(val)
} else {
conn.TupleReply.SourcePort = uint16(val)
}
} else if strings.Contains(fs, "dport") {
// dport field could be the last tuple field in ovs-dpctl output format.
fs = strings.TrimSuffix(fs, ")")

fields := strings.Split(fs, "=")
val, err := strconv.Atoi(fields[len(fields)-1])
if err != nil {
return nil, fmt.Errorf("conversion of dport %s to int failed", fields[len(fields)-1])
}
if !isReply {
conn.TupleOrig.DestinationPort = uint16(val)
} else {
conn.TupleReply.DestinationPort = uint16(val)
}
} else if strings.Contains(fs, "zone") {
fields := strings.Split(fs, "=")
val, err := strconv.Atoi(fields[len(fields)-1])
if err != nil {
return nil, fmt.Errorf("conversion of zone %s to int failed", fields[len(fields)-1])
}
if zoneFilter != uint16(val) {
break
} else {
inZone = true
conn.Zone = uint16(val)
}
} else if strings.Contains(fs, "timeout") {
fields := strings.Split(fs, "=")
val, err := strconv.Atoi(fields[len(fields)-1])
if err != nil {
return nil, fmt.Errorf("conversion of timeout %s to int failed", fields[len(fields)-1])
}
conn.Timeout = uint32(val)
} else if strings.Contains(fs, "id") {
fields := strings.Split(fs, "=")
val, err := strconv.Atoi(fields[len(fields)-1])
if err != nil {
return nil, fmt.Errorf("conversion of id %s to int failed", fields[len(fields)-1])
}
conn.ID = uint32(val)
}
}
if !inZone {
return nil, nil
}
conn.IsActive = true
conn.DoExport = true

return &conn, nil
}

// lookupProtocolMap returns protocol identifier given protocol name
func lookupProtocolMap(name string) (uint8, error) {
name = strings.TrimSpace(name)
lowerCaseStr := strings.ToLower(name)
proto, found := protocols[lowerCaseStr]
if !found {
return 0, fmt.Errorf("unknown IP protocol specified: %s", name)
}
return proto, nil
}
Loading

0 comments on commit e90d1b2

Please sign in to comment.