Skip to content

Commit

Permalink
Support Pod-to-Service flows with Flow Exporter when Antrea Proxy ena…
Browse files Browse the repository at this point in the history
…bled (antrea-io#984)

* Flow Exporter: Support Pod-To-Service flows with Antrea Proxy

Flow exporter can send flow records of Pod-To-Service flows
with the service name, when Antrea proxy is enabled. Flow records from
the source host (local to the sender pod) contain a service name.
For this purpose, added a map in proxier:
ServicePort(clusterIP:Port/Proto) to ServicePortName
  • Loading branch information
srikartati authored and GraysonWu committed Sep 18, 2020
1 parent c00cfd1 commit 16668ef
Show file tree
Hide file tree
Showing 16 changed files with 290 additions and 75 deletions.
4 changes: 3 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func run(o *Options) error {
if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
isChaining = true
}
var proxier *proxy.Proxier
var proxier proxy.Proxier
if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
proxier = proxy.New(nodeConfig.Name, informerFactory, ofClient)
}
Expand Down Expand Up @@ -247,6 +247,8 @@ func run(o *Options) error {
connStore := connections.NewConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, agentQuerier.GetOVSCtlClient(), o.config.OVSDatapathType),
ifaceStore,
serviceCIDRNet,
proxier,
o.pollInterval)
pollDone := make(chan struct{})
go connStore.Run(stopCh, pollDone)
Expand Down
1 change: 1 addition & 0 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ MOCKGEN_TARGETS=(
"pkg/querier AgentNetworkPolicyInfoQuerier"
"pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack"
"pkg/agent/flowexporter/ipfix IPFIXExportingProcess,IPFIXRecord"
"pkg/agent/proxy Proxier"
)

# Command mockgen does not automatically replace variable YEAR with current year
Expand Down
63 changes: 53 additions & 10 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,43 @@ package connections

import (
"fmt"
"net"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/agent/proxy"
)

var serviceProtocolMap = map[uint8]corev1.Protocol{
6: corev1.ProtocolTCP,
17: corev1.ProtocolUDP,
132: corev1.ProtocolSCTP,
}

type ConnectionStore struct {
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
pollInterval time.Duration
mutex sync.Mutex
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
serviceCIDR *net.IPNet
antreaProxier proxy.Proxier
pollInterval time.Duration
mutex sync.Mutex
}

func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, pollInterval time.Duration) *ConnectionStore {
func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, serviceCIDR *net.IPNet, proxier proxy.Proxier, pollInterval time.Duration) *ConnectionStore {
return &ConnectionStore{
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
pollInterval: pollInterval,
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
serviceCIDR: serviceCIDR,
antreaProxier: proxier,
pollInterval: pollInterval,
}
}

Expand Down Expand Up @@ -91,6 +104,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
cs.connections[connKey] = *existingConn
klog.V(4).Infof("Antrea flow updated: %v", existingConn)
} else {
// sourceIP/destinationIP are mapped only to local pods and not remote pods.
var srcFound, dstFound bool
sIface, srcFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleOrig.SourceAddress.String())
dIface, dstFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleReply.SourceAddress.String())
Expand All @@ -113,6 +127,26 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
if !srcFound && dstFound {
conn.DoExport = false
}

// Process Pod-to-Service flows when Antrea Proxy is enabled.
if cs.antreaProxier != nil {
if cs.serviceCIDR.Contains(conn.TupleOrig.DestinationAddress) {
clusterIP := conn.TupleOrig.DestinationAddress.String()
svcPort := conn.TupleOrig.DestinationPort
protocol, err := lookupServiceProtocol(conn.TupleOrig.Protocol)
if err != nil {
klog.Warningf("Could not retrieve Service protocol: %v", err)
} else {
serviceStr := fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol)
servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr)
if !exists {
klog.Warningf("Could not retrieve the Service info from antrea-agent-proxier for the serviceStr: %s", serviceStr)
} else {
conn.DestinationServicePortName = servicePortName.String()
}
}
}
}
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
cs.connections[connKey] = *conn
Expand Down Expand Up @@ -184,3 +218,12 @@ func (cs *ConnectionStore) DeleteConnectionByKey(connKey flowexporter.Connection

return nil
}

// LookupServiceProtocol returns the corresponding Service protocol string for a given protocol identifier
func lookupServiceProtocol(protoID uint8) (corev1.Protocol, error) {
serviceProto, found := serviceProtocolMap[protoID]
if !found {
return "", fmt.Errorf("unknown protocol identifier: %d", protoID)
}
return serviceProto, nil
}
81 changes: 57 additions & 24 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,35 @@
package connections

import (
"fmt"
"net"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
connectionstest "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections/testing"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing"
proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
)

const testPollInterval = 0 // Not used in these tests, hence 0.

func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (*flowexporter.Tuple, *flowexporter.Tuple) {
tuple := &flowexporter.Tuple{
func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (flowexporter.Tuple, flowexporter.Tuple) {
tuple := flowexporter.Tuple{
SourceAddress: *srcIP,
DestinationAddress: *dstIP,
Protocol: protoID,
SourcePort: srcPort,
DestinationPort: dstPort,
}
revTuple := &flowexporter.Tuple{
revTuple := flowexporter.Tuple{
SourceAddress: *dstIP,
DestinationAddress: *srcIP,
Protocol: protoID,
Expand All @@ -62,8 +67,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
IsActive: true,
}
// Flow-2, which is not in ConnectionStore
Expand All @@ -75,10 +80,16 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
OriginalBytes: 0xcbbb,
ReversePackets: 0xbbbb,
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
TupleOrig: tuple2,
TupleReply: revTuple2,
IsActive: true,
}
tuple3, revTuple3 := makeTuple(&net.IP{10, 10, 10, 10}, &net.IP{20, 20, 20, 20}, 6, 5000, 80)
testFlow3 := flowexporter.Connection{
TupleOrig: tuple3,
TupleReply: revTuple3,
IsActive: true,
}
// Create copy of old conntrack flow for testing purposes.
// This flow is already in connection store.
oldTestFlow1 := flowexporter.Connection{
Expand All @@ -88,8 +99,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
ReverseBytes: 0xba,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
SourcePodNamespace: "ns1",
SourcePodName: "pod1",
DestinationPodNamespace: "",
Expand All @@ -106,9 +117,24 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
IP: net.IP{8, 7, 6, 5},
ContainerInterfaceConfig: podConfigFlow2,
}
serviceCIDR := &net.IPNet{
IP: net.IP{20, 20, 20, 0},
Mask: net.IPMask(net.ParseIP("255.255.255.0").To4()),
}
servicePortName := k8sproxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: "serviceNS1",
Name: "service1",
},
Port: "255",
Protocol: v1.ProtocolTCP,
}
// Mock interface store with one of the couple of IPs correspond to Pods
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval)
mockProxier := proxytest.NewMockProxier(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, serviceCIDR, mockProxier, testPollInterval)

// Add flow1conn to the Connection map
testFlow1Tuple := flowexporter.NewConnectionKey(&testFlow1)
connStore.connections[testFlow1Tuple] = oldTestFlow1
Expand All @@ -118,20 +144,27 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
}{
{testFlow1}, // To test update part of function
{testFlow2}, // To test add part of function
{testFlow3}, // To test service name realization
}
for i, test := range addOrUpdateConnTests {
flowTuple := flowexporter.NewConnectionKey(&test.flow)
var expConn flowexporter.Connection
expConn := test.flow
if i == 0 {
expConn = test.flow
expConn.SourcePodNamespace = "ns1"
expConn.SourcePodName = "pod1"
} else {
expConn = test.flow
} else if i == 1 {
expConn.DestinationPodNamespace = "ns2"
expConn.DestinationPodName = "pod2"
mockIfaceStore.EXPECT().GetInterfaceByIP(test.flow.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(test.flow.TupleReply.SourceAddress.String()).Return(interfaceFlow2, true)
} else {
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleReply.SourceAddress.String()).Return(nil, false)

protocol, _ := lookupServiceProtocol(expConn.TupleOrig.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", expConn.TupleOrig.DestinationAddress.String(), expConn.TupleOrig.DestinationPort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
expConn.DestinationServicePortName = servicePortName.String()
}
connStore.addOrUpdateConn(&test.flow)
actualConn, ok := connStore.GetConnByKey(flowTuple)
Expand All @@ -156,8 +189,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
IsActive: true,
}
// Flow-2, which is not in ConnectionStore
Expand All @@ -169,8 +202,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
OriginalBytes: 0xcbbb,
ReversePackets: 0xbbbb,
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
TupleOrig: tuple2,
TupleReply: revTuple2,
IsActive: true,
}
for i, flow := range testFlows {
Expand All @@ -180,7 +213,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
// Create ConnectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval)
// Add flows to the Connection store
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = *flow
Expand Down Expand Up @@ -218,8 +251,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
IsActive: true,
}
// Flow-2, which is not in ConnectionStore
Expand All @@ -231,8 +264,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
OriginalBytes: 0xcbbb,
ReversePackets: 0xbbbb,
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
TupleOrig: tuple2,
TupleReply: revTuple2,
IsActive: true,
}
for i, flow := range testFlows {
Expand All @@ -242,7 +275,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
// Create ConnectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval)
// Add flows to the connection store.
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = *flow
Expand Down
16 changes: 8 additions & 8 deletions pkg/agent/flowexporter/connections/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,25 @@ func TestConnTrackSystem_DumpFlows(t *testing.T) {
// Create flows for test
tuple, revTuple := makeTuple(&net.IP{1, 2, 3, 4}, &net.IP{4, 3, 2, 1}, 6, 65280, 255)
antreaFlow := &flowexporter.Connection{
TupleOrig: *tuple,
TupleReply: *revTuple,
TupleOrig: tuple,
TupleReply: revTuple,
Zone: openflow.CtZone,
}
tuple, revTuple = makeTuple(&net.IP{1, 2, 3, 4}, &net.IP{100, 50, 25, 5}, 6, 60001, 200)
antreaServiceFlow := &flowexporter.Connection{
TupleOrig: *tuple,
TupleReply: *revTuple,
TupleOrig: tuple,
TupleReply: revTuple,
Zone: openflow.CtZone,
}
tuple, revTuple = makeTuple(&net.IP{5, 6, 7, 8}, &net.IP{8, 7, 6, 5}, 6, 60001, 200)
antreaGWFlow := &flowexporter.Connection{
TupleOrig: *tuple,
TupleReply: *revTuple,
TupleOrig: tuple,
TupleReply: revTuple,
Zone: openflow.CtZone,
}
nonAntreaFlow := &flowexporter.Connection{
TupleOrig: *tuple,
TupleReply: *revTuple,
TupleOrig: tuple,
TupleReply: revTuple,
Zone: 100,
}
testFlows := []*flowexporter.Connection{antreaFlow, antreaServiceFlow, antreaGWFlow, nonAntreaFlow}
Expand Down
17 changes: 17 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ var (
"destinationPodName",
"destinationPodNamespace",
"destinationNodeName",
"destinationClusterIP",
"destinationServicePortName",
}
)

Expand Down Expand Up @@ -322,6 +324,21 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex
} else {
_, err = dataRec.AddInfoElement(ie, "")
}
case "destinationClusterIP":
if record.Conn.DestinationServicePortName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.DestinationAddress)
} else {
// Sending dummy IP as IPFIX collector expects constant length of data for IP field.
// We should probably think of better approach as this involves customization of IPFIX collector to ignore
// this dummy IP address.
_, err = dataRec.AddInfoElement(ie, net.IP{0, 0, 0, 0})
}
case "destinationServicePortName":
if record.Conn.DestinationServicePortName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServicePortName)
} else {
_, err = dataRec.AddInfoElement(ie, "")
}
}
if err != nil {
return fmt.Errorf("error while adding info element: %s to data record: %v", ie.Name, err)
Expand Down
Loading

0 comments on commit 16668ef

Please sign in to comment.