Skip to content

Commit

Permalink
Flow Exporter: Support Pod-To-Service flows with Antrea Proxy
Browse files Browse the repository at this point in the history
Flow exporter can send flow records of Pod-To-Service flows
with service name, when Antrea proxy is enabled. Flow records from
source host (local to pod) contains service name.
For this purpose, added a map in proxier:
ServicePort(clusterIP:Port/Proto) to ServicePortName
  • Loading branch information
srikartati committed Aug 11, 2020
1 parent c9d365d commit e3ad965
Show file tree
Hide file tree
Showing 14 changed files with 252 additions and 35 deletions.
3 changes: 2 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func run(o *Options) error {
if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
isChaining = true
}
var proxier *proxy.Proxier
var proxier proxy.Proxier
if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
proxier = proxy.New(nodeConfig.Name, informerFactory, ofClient)
}
Expand Down Expand Up @@ -245,6 +245,7 @@ func run(o *Options) error {
connStore := connections.NewConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, agentQuerier.GetOVSCtlClient(), o.config.OVSDatapathType),
ifaceStore,
proxier,
o.pollInterval)
pollDone := make(chan struct{})
go connStore.Run(stopCh, pollDone)
Expand Down
1 change: 1 addition & 0 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ MOCKGEN_TARGETS=(
"pkg/querier AgentNetworkPolicyInfoQuerier"
"pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack"
"pkg/agent/flowexporter/ipfix IPFIXExportingProcess,IPFIXRecord"
"pkg/agent/proxy Proxier"
)

# Command mockgen does not automatically replace variable YEAR with current year
Expand Down
32 changes: 30 additions & 2 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package connections

import (
"fmt"
"net"
"sync"
"time"

Expand All @@ -24,22 +25,28 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/agent/proxy"
"github.com/vmware-tanzu/antrea/pkg/util/ip"
)

type ConnectionStore struct {
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
serviceCIDR *net.IPNet
antreaProxier proxy.Proxier
pollInterval time.Duration
mutex sync.Mutex
}

func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, pollInterval time.Duration) *ConnectionStore {
func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, serviceCIDR *net.IPNet, proxier proxy.Proxier, pollInterval time.Duration) *ConnectionStore {
return &ConnectionStore{
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
pollInterval: pollInterval,
serviceCIDR: serviceCIDR,
antreaProxier: proxier,
pollInterval: pollInterval,
}
}

Expand Down Expand Up @@ -91,6 +98,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
cs.connections[connKey] = *existingConn
klog.V(4).Infof("Antrea flow updated: %v", existingConn)
} else {
// sourceIP/destinationIP are mapped only to local pods and not remote pods.
var srcFound, dstFound bool
sIface, srcFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleOrig.SourceAddress.String())
dIface, dstFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleReply.SourceAddress.String())
Expand All @@ -113,6 +121,26 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
if !srcFound && dstFound {
conn.DoExport = false
}

// Pod-to-Service flows w/ antrea-proxy:Antrea proxy is enabled.
if cs.antreaProxier != nil {
if cs.serviceCIDR.Contains(conn.TupleOrig.DestinationAddress) {
clusterIP := conn.TupleOrig.DestinationAddress.String()
svcPort := conn.TupleOrig.DestinationPort
protocol, err := ip.LookupServiceProtocol(conn.TupleOrig.Protocol)
if err != nil {
klog.Warningf("Could not retrieve service protocol: %v", err)
} else {
serviceStr := fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol)
servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr)
if !exists {
klog.Warningf("Could not retrieve service info from antrea-agent-proxier for serviceStr: %s", serviceStr)
} else {
conn.DestinationServiceName = servicePortName.String()
}
}
}
}
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
cs.connections[connKey] = *conn
Expand Down
48 changes: 41 additions & 7 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@
package connections

import (
"fmt"
"net"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
connectionstest "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections/testing"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing"
proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing"
"github.com/vmware-tanzu/antrea/pkg/util/ip"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
)

const testPollInterval = 0 // Not used in these tests, hence 0.
Expand Down Expand Up @@ -79,6 +85,12 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
TupleReply: *revTuple2,
IsActive: true,
}
tuple3, revTuple3 := makeTuple(&net.IP{10, 10, 10, 10}, &net.IP{20, 20, 20, 20}, 6, 5000, 80)
testFlow3 := flowexporter.Connection{
TupleOrig: *tuple3,
TupleReply: *revTuple3,
IsActive: true,
}
// Create copy of old conntrack flow for testing purposes.
// This flow is already in connection store.
oldTestFlow1 := flowexporter.Connection{
Expand Down Expand Up @@ -106,9 +118,24 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
IP: net.IP{8, 7, 6, 5},
ContainerInterfaceConfig: podConfigFlow2,
}
serviceCIDR := &net.IPNet{
IP: net.IP{20, 20, 20, 0},
Mask: net.IPMask(net.ParseIP("255.255.255.0").To4()),
}
servicePortName := k8sproxy.ServicePortName{
NamespacedName: types.NamespacedName{
"serviceNS1",
"service1",
},
Port: "255",
Protocol: v1.ProtocolTCP,
}
// Mock interface store with one of the couple of IPs correspond to Pods
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval)
mockProxier := proxytest.NewMockProxier(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, serviceCIDR, mockProxier, testPollInterval)

// Add flow1conn to the Connection map
testFlow1Tuple := flowexporter.NewConnectionKey(&testFlow1)
connStore.connections[testFlow1Tuple] = oldTestFlow1
Expand All @@ -118,20 +145,27 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
}{
{testFlow1}, // To test update part of function
{testFlow2}, // To test add part of function
{testFlow3}, // To test service name realization
}
for i, test := range addOrUpdateConnTests {
flowTuple := flowexporter.NewConnectionKey(&test.flow)
var expConn flowexporter.Connection
expConn := test.flow
if i == 0 {
expConn = test.flow
expConn.SourcePodNamespace = "ns1"
expConn.SourcePodName = "pod1"
} else {
expConn = test.flow
} else if i == 1 {
expConn.DestinationPodNamespace = "ns2"
expConn.DestinationPodName = "pod2"
mockIfaceStore.EXPECT().GetInterfaceByIP(test.flow.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(test.flow.TupleReply.SourceAddress.String()).Return(interfaceFlow2, true)
} else {
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleReply.SourceAddress.String()).Return(nil, false)

protocol, _ := ip.LookupServiceProtocol(expConn.TupleOrig.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", expConn.TupleOrig.DestinationAddress.String(), expConn.TupleOrig.DestinationPort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
expConn.DestinationServiceName = servicePortName.String()
}
connStore.addOrUpdateConn(&test.flow)
actualConn, ok := connStore.GetConnByKey(flowTuple)
Expand Down Expand Up @@ -180,7 +214,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
// Create ConnectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval)
// Add flows to the Connection store
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = *flow
Expand Down Expand Up @@ -242,7 +276,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
// Create ConnectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval)
// Add flows to the connection store.
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = *flow
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func netlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connectio
"",
"",
"",
"",
}

return &newConn
Expand Down
17 changes: 17 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ var (
"destinationPodName",
"destinationPodNamespace",
"destinationNodeName",
"destinationClusterIP",
"destinationServiceName",
}
)

Expand Down Expand Up @@ -322,6 +324,21 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex
} else {
_, err = dataRec.AddInfoElement(ie, "")
}
case "destinationClusterIP":
if record.Conn.DestinationServiceName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.DestinationAddress)
} else {
// Sending dummy IP as IPFIX collector expects constant length of data for IP field.
// We should probably think of better approach as this involves customization of IPFIX collector to ignore
// this dummy IP address.
_, err = dataRec.AddInfoElement(ie, net.IP{0, 0, 0, 0})
}
case "destinationServiceName":
if record.Conn.DestinationServiceName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServiceName)
} else {
_, err = dataRec.AddInfoElement(ie, "")
}
}
if err != nil {
return fmt.Errorf("error while adding info element: %s to data record: %v", ie.Name, err)
Expand Down
6 changes: 5 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package exporter

import (
"net"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -146,6 +147,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) {
for i, ie := range AntreaInfoElements {
elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0)
}
// Define mocks and flowExporter
mockIPFIXExpProc := ipfixtest.NewMockIPFIXExportingProcess(ctrl)
mockDataRec := ipfixtest.NewMockIPFIXRecord(ctrl)
flowExp := &flowExporter{
Expand All @@ -165,13 +167,15 @@ func TestFlowExporter_sendDataRecord(t *testing.T) {
mockDataRec.EXPECT().AddInfoElement(ie, time.Time{}.Unix()).Return(tempBytes, nil)
case "sourceIPv4Address", "destinationIPv4Address":
mockDataRec.EXPECT().AddInfoElement(ie, nil).Return(tempBytes, nil)
case "destinationClusterIP":
mockDataRec.EXPECT().AddInfoElement(ie, net.IP{0, 0, 0, 0}).Return(tempBytes, nil)
case "sourceTransportPort", "destinationTransportPort":
mockDataRec.EXPECT().AddInfoElement(ie, uint16(0)).Return(tempBytes, nil)
case "protocolIdentifier":
mockDataRec.EXPECT().AddInfoElement(ie, uint8(0)).Return(tempBytes, nil)
case "packetTotalCount", "octetTotalCount", "packetDeltaCount", "octetDeltaCount", "reverse_PacketTotalCount", "reverse_OctetTotalCount", "reverse_PacketDeltaCount", "reverse_OctetDeltaCount":
mockDataRec.EXPECT().AddInfoElement(ie, uint64(0)).Return(tempBytes, nil)
case "sourcePodName", "sourcePodNamespace", "sourceNodeName", "destinationPodName", "destinationPodNamespace", "destinationNodeName":
case "sourcePodName", "sourcePodNamespace", "sourceNodeName", "destinationPodName", "destinationPodNamespace", "destinationNodeName", "destinationServiceName":
mockDataRec.EXPECT().AddInfoElement(ie, "").Return(tempBytes, nil)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Connection struct {
SourcePodName string
DestinationPodNamespace string
DestinationPodName string
DestinationServiceName string
}

type FlowRecord struct {
Expand Down
Loading

0 comments on commit e3ad965

Please sign in to comment.