Skip to content

Commit

Permalink
Flow Exporter: Support Pod-To-Service flows with Antrea Proxy
Browse files Browse the repository at this point in the history
Flow exporter can send flow records of Pod-To-Service flows
with service name, when Antrea proxy is enabled. Flow records from both
hosts, source host (local to pod) and destination host (local to
endpoint of service) contains service name.
For this purpose, added two maps: one from
ServicePort(clusterIP:Port/Proto) to ServicePortName
and other one is endpoint(endpointIP:Port/Proto) to ServicePortName.
  • Loading branch information
srikartati committed Aug 4, 2020
1 parent b48253f commit c3fe824
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 11 deletions.
3 changes: 2 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func run(o *Options) error {
if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
isChaining = true
}
var proxier *proxy.Proxier
var proxier *proxy.Proxier = nil
if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
proxier = proxy.New(nodeConfig.Name, informerFactory, ofClient)
}
Expand Down Expand Up @@ -245,6 +245,7 @@ func run(o *Options) error {
serviceCIDRNet,
agentQuerier.GetOVSCtlClient(),
ifaceStore,
proxier,
o.pollInterval)
// pollDone helps in synchronizing connStore.Run and flowExporter.Run go routines.
pollDone := make(chan struct{})
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/stretchr/testify v1.5.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f
github.com/vmware/go-ipfix v0.0.0-20200727033108-5d6b52a1d911
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f h1:XyyczLRk8+6YqYXE8v20XjbVtK415KR114IrjX9THpQ=
github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/vmware/go-ipfix v0.0.0-20200727033108-5d6b52a1d911 h1:55SVfHzcBgbIQ6wrugWgtOExP8mMz+6H/jtsC9die4U=
github.com/vmware/go-ipfix v0.0.0-20200727033108-5d6b52a1d911/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20200609044910-a72f3e66744e h1:NM4NTe6Z+mF5IYlYAiEdRlY8XcMY4P6VlYqgsBhpojQ=
github.com/wenyingd/ofnet v0.0.0-20200609044910-a72f3e66744e/go.mod h1:+g6SfqhTVqeGEmUJ0l4WtCgsL4dflTUJE4k+TPCKqXo=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
37 changes: 32 additions & 5 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,33 @@ package connections

import (
"fmt"
"github.com/vmware-tanzu/antrea/pkg/agent/config"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl"
"net"
"sync"
"time"

"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/agent/proxy"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl"
"github.com/vmware-tanzu/antrea/pkg/util/ip"
)

type ConnectionStore struct {
Connections map[flowexporter.ConnectionKey]flowexporter.Connection
ConnDumper ConnTrackDumper
IfaceStore interfacestore.InterfaceStore
serviceCIDR *net.IPNet
antreaProxier *proxy.Proxier
pollInterval time.Duration
mutex sync.Mutex
}

func NewConnectionStore(ovsDatapathType string, nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet, ovsctlClient ovsctl.OVSCtlClient, ifaceStore interfacestore.InterfaceStore, pollInterval time.Duration) *ConnectionStore {
func NewConnectionStore(ovsDatapathType string, nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet, ovsctlClient ovsctl.OVSCtlClient, ifaceStore interfacestore.InterfaceStore, proxier *proxy.Proxier, pollInterval time.Duration) *ConnectionStore {
var connTrackDumper ConnTrackDumper
if ovsDatapathType == ovsconfig.OVSDatapathSystem {
connTrackDumper = NewConnTrackSystem(nodeConfig, serviceCIDR)
Expand All @@ -49,6 +53,8 @@ func NewConnectionStore(ovsDatapathType string, nodeConfig *config.NodeConfig, s
Connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
ConnDumper: connTrackDumper,
IfaceStore: ifaceStore,
serviceCIDR: serviceCIDR,
antreaProxier: proxier,
pollInterval: pollInterval,
}
}
Expand Down Expand Up @@ -102,13 +108,13 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
cs.Connections[connKey] = *existingConn
klog.V(4).Infof("Antrea flow updated: %v", existingConn)
} else {
// sourceIP/destinationIP are mapped only to local pods and not remote pods.
var srcFound, dstFound bool
sIface, srcFound := cs.IfaceStore.GetInterfaceByIP(conn.TupleOrig.SourceAddress.String())
dIface, dstFound := cs.IfaceStore.GetInterfaceByIP(conn.TupleReply.SourceAddress.String())
if !srcFound && !dstFound {
klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", conn.TupleOrig.SourceAddress.String(), conn.TupleReply.SourceAddress.String())
}
// sourceIP/destinationIP are mapped only to local pods and not remote pods.
if srcFound && sIface.Type == interfacestore.ContainerInterface {
conn.SourcePodName = sIface.ContainerInterfaceConfig.PodName
conn.SourcePodNamespace = sIface.ContainerInterfaceConfig.PodNamespace
Expand All @@ -124,6 +130,27 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
if !srcFound && dstFound {
conn.DoExport = false
}

// Pod-to-Service flows w/ antrea-proxy:Antrea proxy is enabled.
if cs.antreaProxier != nil {
if cs.serviceCIDR.Contains(conn.TupleOrig.DestinationAddress) {
// Move this code outside
clusterIP := conn.TupleOrig.DestinationAddress.String()
svcPort := conn.TupleOrig.DestinationPort
protocol, err := ip.LookupServiceProtocol(conn.TupleOrig.Protocol)
if err != nil {
klog.Warningf("Could not retrieve service protocol: %v", err)
} else {
serviceStr := fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol)
servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr)
if !exists {
klog.Warningf("Could not retrieve service info from antrea-agent-proxier for serviceStr: %s", serviceStr)
} else {
conn.DestinationServiceName = servicePortName.String()
}
}
}
}
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
cs.Connections[connKey] = *conn
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func createAntreaConn(conn *conntrack.Flow) *flowexporter.Connection {
"",
"",
"",
"",
}

return &newConn
Expand Down
17 changes: 17 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ var (
"destinationPodName",
"destinationPodNamespace",
"destinationNodeName",
"destinationClusterIP",
"destinationServiceName",
}
)

Expand Down Expand Up @@ -314,6 +316,21 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex
} else {
_, err = dataRec.AddInfoElement(ie, "")
}
case "destinationClusterIP":
if record.Conn.DestinationServiceName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.DestinationAddress)
} else {
// Sending dummy IP as IPFIX collector expects constant length of data for IP field.
// We should probably think of better approach as this involves customization of IPFIX collector to ignore
// this dummy IP address.
_, err = dataRec.AddInfoElement(ie, net.IP{0, 0, 0, 0})
}
case "destinationServiceName":
if record.Conn.DestinationServiceName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServiceName)
} else {
_, err = dataRec.AddInfoElement(ie, "")
}
}
if err != nil {
return fmt.Errorf("error while adding info element: %s to data record: %v", ie.Name, err)
Expand Down
6 changes: 5 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package exporter

import (
"net"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -140,6 +141,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) {
for i, ie := range AntreaInfoElements {
elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0)
}
// Define mocks and flowExporter
mockIPFIXExpProc := ipfixtest.NewMockIPFIXExportingProcess(ctrl)
mockDataRec := ipfixtest.NewMockIPFIXRecord(ctrl)
flowExp := &flowExporter{
Expand All @@ -159,13 +161,15 @@ func TestFlowExporter_sendDataRecord(t *testing.T) {
mockDataRec.EXPECT().AddInfoElement(ie, time.Time{}.Unix()).Return(tempBytes, nil)
case "sourceIPv4Address", "destinationIPv4Address":
mockDataRec.EXPECT().AddInfoElement(ie, nil).Return(tempBytes, nil)
case "destinationClusterIP":
mockDataRec.EXPECT().AddInfoElement(ie, net.IP{0, 0, 0, 0}).Return(tempBytes, nil)
case "sourceTransportPort", "destinationTransportPort":
mockDataRec.EXPECT().AddInfoElement(ie, uint16(0)).Return(tempBytes, nil)
case "protocolIdentifier":
mockDataRec.EXPECT().AddInfoElement(ie, uint8(0)).Return(tempBytes, nil)
case "packetTotalCount", "octetTotalCount", "packetDeltaCount", "octetDeltaCount", "reverse_PacketTotalCount", "reverse_OctetTotalCount", "reverse_PacketDeltaCount", "reverse_OctetDeltaCount":
mockDataRec.EXPECT().AddInfoElement(ie, uint64(0)).Return(tempBytes, nil)
case "sourcePodName", "sourcePodNamespace", "sourceNodeName", "destinationPodName", "destinationPodNamespace", "destinationNodeName":
case "sourcePodName", "sourcePodNamespace", "sourceNodeName", "destinationPodName", "destinationPodNamespace", "destinationNodeName", "destinationServiceName":
mockDataRec.EXPECT().AddInfoElement(ie, "").Return(tempBytes, nil)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Connection struct {
SourcePodName string
DestinationPodNamespace string
DestinationPodName string
DestinationServiceName string
}

type FlowRecord struct {
Expand Down
65 changes: 65 additions & 0 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package proxy

import (
"fmt"
"net"
"sync"
"time"
Expand Down Expand Up @@ -60,6 +61,14 @@ type Proxier struct {
// endpointInstalledMap stores endpoints we actually installed.
endpointInstalledMap map[k8sproxy.ServicePortName]map[string]struct{}
groupCounter types.GroupCounter
// serviceStringMap provides map from serviceString(ClusterIP:Port/Proto) to ServicePortName.
serviceStringMap map[string]k8sproxy.ServicePortName
// serviceStringMapMutex protects serviceStringMap object.
serviceStringMapMutex sync.Mutex
// endpointToServiceMap provides map from endpoint string(IP:Port) to ServicePortName.
endpointToServiceMap map[string]k8sproxy.ServicePortName
// endpointToServiceMapMutex protects endpointToServiceMap object.
endpointToServiceMapMutex sync.Mutex

runner *k8sproxy.BoundedFrequencyRunner
stopChan <-chan struct{}
Expand Down Expand Up @@ -101,6 +110,7 @@ func (p *Proxier) removeStaleServices() {
continue
}
delete(p.serviceInstalledMap, svcPortName)
p.deleteServiceByIP(svcInfo.String())
p.groupCounter.Recycle(svcPortName)
}
}
Expand All @@ -123,6 +133,8 @@ func (p *Proxier) removeStaleEndpoints(staleEndpoints map[k8sproxy.ServicePortNa
if len(m) == 0 {
delete(p.endpointInstalledMap, svcPortName)
}
endpointStr := fmt.Sprintf("%s/%s", endpoint.String(), svcPortName.Protocol)
p.deleteServiceByEndpoint(endpointStr)
}
}
}
Expand Down Expand Up @@ -151,6 +163,8 @@ func (p *Proxier) installServices() {
if _, ok := endpointInstalled[endpoint.String()]; !ok {
needUpdate = true
endpointInstalled[endpoint.String()] = struct{}{}
endpointStr := fmt.Sprintf("%s/%s", endpoint.String(), svcPortName.Protocol)
p.addServiceByEndpoint(endpointStr, svcPortName)
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}
Expand Down Expand Up @@ -185,6 +199,7 @@ func (p *Proxier) installServices() {
}
}
p.serviceInstalledMap[svcPortName] = svcPort
p.addServiceByIP(svcInfo.String(), svcPortName)
}
}

Expand Down Expand Up @@ -258,6 +273,54 @@ func (p *Proxier) OnServiceSynced() {
}
}

func (p *Proxier) GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, bool) {
p.serviceStringMapMutex.Lock()
defer p.serviceStringMapMutex.Unlock()

serviceInfo, exists := p.serviceStringMap[serviceStr]
return serviceInfo, exists
}

func (p *Proxier) addServiceByIP(serviceStr string, servicePortName k8sproxy.ServicePortName) {
p.serviceStringMapMutex.Lock()
defer p.serviceStringMapMutex.Unlock()
klog.V(2).Infof("Added service with key: %v value: %v", serviceStr, servicePortName)
p.serviceStringMap[serviceStr] = servicePortName
}

func (p *Proxier) deleteServiceByIP(serviceStr string) {
p.serviceStringMapMutex.Lock()
defer p.serviceStringMapMutex.Unlock()

delete(p.serviceStringMap, serviceStr)
}

func (p *Proxier) GetServiceByEndpoint(endpointStr string) (k8sproxy.ServicePortName, bool) {
p.endpointToServiceMapMutex.Lock()
defer p.endpointToServiceMapMutex.Unlock()

serviceInfo, exists := p.endpointToServiceMap[endpointStr]
if exists {
klog.V(2).Infof("Retrieved service with key: %v", endpointStr)
}
return serviceInfo, exists
}

func (p *Proxier) addServiceByEndpoint(endpointStr string, servicePortName k8sproxy.ServicePortName) {
p.endpointToServiceMapMutex.Lock()
defer p.endpointToServiceMapMutex.Unlock()

klog.V(2).Infof("Added service with key: %v value: %v", endpointStr, servicePortName)
p.endpointToServiceMap[endpointStr] = servicePortName
}

func (p *Proxier) deleteServiceByEndpoint(endpointStr string) {
p.endpointToServiceMapMutex.Lock()
defer p.endpointToServiceMapMutex.Unlock()

delete(p.endpointToServiceMap, endpointStr)
}

func (p *Proxier) Run(stopCh <-chan struct{}) {
p.once.Do(func() {
go p.serviceConfig.Run(stopCh)
Expand All @@ -281,6 +344,8 @@ func New(hostname string, informerFactory informers.SharedInformerFactory, ofCli
serviceInstalledMap: k8sproxy.ServiceMap{},
endpointInstalledMap: map[k8sproxy.ServicePortName]map[string]struct{}{},
endpointsMap: types.EndpointsMap{},
serviceStringMap: map[string]k8sproxy.ServicePortName{},
endpointToServiceMap: map[string]k8sproxy.ServicePortName{},
groupCounter: types.NewGroupCounter(),
ofClient: ofClient,
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/util/ip/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ip
import (
"bytes"
"fmt"
corev1 "k8s.io/api/core/v1"
"net"
"sort"
"strings"
Expand All @@ -38,6 +39,13 @@ var protocols = map[string]uint8{
"ipv6-icmp": 58,
}

var serviceProtocolMap = map[uint8]corev1.Protocol{
6: corev1.ProtocolTCP,
17: corev1.ProtocolUDP,
132: corev1.ProtocolSCTP,
}


// This function takes in one allow CIDR and multiple except CIDRs and gives diff CIDRs
// in allowCIDR eliminating except CIDRs. It currently supports only IPv4. except CIDR input
// can be changed.
Expand Down Expand Up @@ -167,3 +175,12 @@ func LookupProtocolMap(name string) (uint8, error) {
}
return proto, nil
}

// LookupServiceProtocol return service protocol string given protocol identifier
func LookupServiceProtocol(protoID uint8) (corev1.Protocol, error) {
serviceProto, found := serviceProtocolMap[protoID]
if !found {
return "", fmt.Errorf("unknown protocol identifier: %d", protoID)
}
return serviceProto, nil
}
2 changes: 1 addition & 1 deletion plugins/octant/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware-tanzu/octant v0.13.1 h1:hz4JDnAA7xDkFjF4VEbt5SrSRrG26FCxKXXBGapf6Nc=
github.com/vmware-tanzu/octant v0.13.1/go.mod h1:4q+wrV4tmUwAdMjvYOujSTtZbE4+zm0n5mb7FjvN0I0=
github.com/vmware/go-ipfix v0.0.0-20200715175325-6ade358dcb5f/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/vmware/go-ipfix v0.0.0-20200727033108-5d6b52a1d911/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20200601065543-2c7a62482f16/go.mod h1:+g6SfqhTVqeGEmUJ0l4WtCgsL4dflTUJE4k+TPCKqXo=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
Expand Down

0 comments on commit c3fe824

Please sign in to comment.