Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Pod-to-Service flows with Flow Exporter when Antrea Proxy enabled #984

Merged
merged 2 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func run(o *Options) error {
if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
isChaining = true
}
var proxier *proxy.Proxier
var proxier proxy.Proxier
if features.DefaultFeatureGate.Enabled(features.AntreaProxy) {
proxier = proxy.New(nodeConfig.Name, informerFactory, ofClient)
}
Expand Down Expand Up @@ -245,6 +245,8 @@ func run(o *Options) error {
connStore := connections.NewConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, agentQuerier.GetOVSCtlClient(), o.config.OVSDatapathType),
ifaceStore,
serviceCIDRNet,
proxier,
o.pollInterval)
pollDone := make(chan struct{})
go connStore.Run(stopCh, pollDone)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/stretchr/testify v1.5.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc
github.com/vmware/go-ipfix v0.1.0
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc h1:lytkY3WfWgOyyaOlgj/3Y5Fkwc9ENff2qg6Ul4FYriE=
github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/vmware/go-ipfix v0.1.0 h1:qbS1kJcs50vaTmyqN1VIk9I1YVikpuS+Uleze/wfYCA=
github.com/vmware/go-ipfix v0.1.0/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20200812094731-56e62d82092a h1:bP9FHcDcj/Pz9xy+gKC4KS3joEAqBMonqhGHnEDsQV8=
github.com/wenyingd/ofnet v0.0.0-20200812094731-56e62d82092a/go.mod h1:oF9872TvzJqLzLKDGVMItRLWJHlnwXluuIuNbOP5WKM=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
1 change: 1 addition & 0 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ MOCKGEN_TARGETS=(
"pkg/querier AgentNetworkPolicyInfoQuerier"
"pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack"
"pkg/agent/flowexporter/ipfix IPFIXExportingProcess,IPFIXRecord"
"pkg/agent/proxy Proxier"
)

# Command mockgen does not automatically replace variable YEAR with current year
Expand Down
63 changes: 53 additions & 10 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,43 @@ package connections

import (
"fmt"
"net"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/agent/proxy"
)

var serviceProtocolMap = map[uint8]corev1.Protocol{
6: corev1.ProtocolTCP,
17: corev1.ProtocolUDP,
132: corev1.ProtocolSCTP,
}

type ConnectionStore struct {
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
pollInterval time.Duration
mutex sync.Mutex
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
serviceCIDR *net.IPNet
antreaProxier proxy.Proxier
pollInterval time.Duration
mutex sync.Mutex
}

func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, pollInterval time.Duration) *ConnectionStore {
func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, serviceCIDR *net.IPNet, proxier proxy.Proxier, pollInterval time.Duration) *ConnectionStore {
return &ConnectionStore{
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
pollInterval: pollInterval,
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
serviceCIDR: serviceCIDR,
antreaProxier: proxier,
pollInterval: pollInterval,
}
}

Expand Down Expand Up @@ -91,6 +104,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
cs.connections[connKey] = *existingConn
klog.V(4).Infof("Antrea flow updated: %v", existingConn)
} else {
// sourceIP/destinationIP are mapped only to local pods and not remote pods.
var srcFound, dstFound bool
sIface, srcFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleOrig.SourceAddress.String())
dIface, dstFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleReply.SourceAddress.String())
Expand All @@ -113,6 +127,26 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
if !srcFound && dstFound {
conn.DoExport = false
}

// Process Pod-to-Service flows when Antrea Proxy is enabled.
if cs.antreaProxier != nil {
if cs.serviceCIDR.Contains(conn.TupleOrig.DestinationAddress) {
clusterIP := conn.TupleOrig.DestinationAddress.String()
svcPort := conn.TupleOrig.DestinationPort
protocol, err := lookupServiceProtocol(conn.TupleOrig.Protocol)
if err != nil {
klog.Warningf("Could not retrieve Service protocol: %v", err)
} else {
serviceStr := fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol)
servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr)
if !exists {
klog.Warningf("Could not retrieve the Service info from antrea-agent-proxier for the serviceStr: %s", serviceStr)
} else {
conn.DestinationServicePortName = servicePortName.String()
}
}
}
}
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
cs.connections[connKey] = *conn
Expand Down Expand Up @@ -184,3 +218,12 @@ func (cs *ConnectionStore) DeleteConnectionByKey(connKey flowexporter.Connection

return nil
}

// LookupServiceProtocol returns the corresponding Service protocol string for a given protocol identifier
func lookupServiceProtocol(protoID uint8) (corev1.Protocol, error) {
serviceProto, found := serviceProtocolMap[protoID]
if !found {
return "", fmt.Errorf("unknown protocol identifier: %d", protoID)
}
return serviceProto, nil
}
81 changes: 57 additions & 24 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,35 @@
package connections

import (
"fmt"
"net"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
connectionstest "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections/testing"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing"
proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
)

const testPollInterval = 0 // Not used in these tests, hence 0.

func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (*flowexporter.Tuple, *flowexporter.Tuple) {
tuple := &flowexporter.Tuple{
func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (flowexporter.Tuple, flowexporter.Tuple) {
tuple := flowexporter.Tuple{
SourceAddress: *srcIP,
DestinationAddress: *dstIP,
Protocol: protoID,
SourcePort: srcPort,
DestinationPort: dstPort,
}
revTuple := &flowexporter.Tuple{
revTuple := flowexporter.Tuple{
SourceAddress: *dstIP,
DestinationAddress: *srcIP,
Protocol: protoID,
Expand All @@ -62,8 +67,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
IsActive: true,
}
// Flow-2, which is not in ConnectionStore
Expand All @@ -75,10 +80,16 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
OriginalBytes: 0xcbbb,
ReversePackets: 0xbbbb,
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
TupleOrig: tuple2,
TupleReply: revTuple2,
IsActive: true,
}
tuple3, revTuple3 := makeTuple(&net.IP{10, 10, 10, 10}, &net.IP{20, 20, 20, 20}, 6, 5000, 80)
testFlow3 := flowexporter.Connection{
TupleOrig: tuple3,
TupleReply: revTuple3,
IsActive: true,
}
// Create copy of old conntrack flow for testing purposes.
// This flow is already in connection store.
oldTestFlow1 := flowexporter.Connection{
Expand All @@ -88,8 +99,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
ReverseBytes: 0xba,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
SourcePodNamespace: "ns1",
SourcePodName: "pod1",
DestinationPodNamespace: "",
Expand All @@ -106,9 +117,24 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
IP: net.IP{8, 7, 6, 5},
ContainerInterfaceConfig: podConfigFlow2,
}
serviceCIDR := &net.IPNet{
IP: net.IP{20, 20, 20, 0},
Mask: net.IPMask(net.ParseIP("255.255.255.0").To4()),
}
servicePortName := k8sproxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: "serviceNS1",
Name: "service1",
},
Port: "255",
Protocol: v1.ProtocolTCP,
}
// Mock interface store with one of the couple of IPs correspond to Pods
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval)
mockProxier := proxytest.NewMockProxier(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, serviceCIDR, mockProxier, testPollInterval)

// Add flow1conn to the Connection map
testFlow1Tuple := flowexporter.NewConnectionKey(&testFlow1)
connStore.connections[testFlow1Tuple] = oldTestFlow1
Expand All @@ -118,20 +144,27 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
}{
{testFlow1}, // To test update part of function
{testFlow2}, // To test add part of function
{testFlow3}, // To test service name realization
}
for i, test := range addOrUpdateConnTests {
flowTuple := flowexporter.NewConnectionKey(&test.flow)
var expConn flowexporter.Connection
expConn := test.flow
if i == 0 {
expConn = test.flow
expConn.SourcePodNamespace = "ns1"
expConn.SourcePodName = "pod1"
} else {
expConn = test.flow
} else if i == 1 {
expConn.DestinationPodNamespace = "ns2"
expConn.DestinationPodName = "pod2"
mockIfaceStore.EXPECT().GetInterfaceByIP(test.flow.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(test.flow.TupleReply.SourceAddress.String()).Return(interfaceFlow2, true)
} else {
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleReply.SourceAddress.String()).Return(nil, false)

protocol, _ := lookupServiceProtocol(expConn.TupleOrig.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", expConn.TupleOrig.DestinationAddress.String(), expConn.TupleOrig.DestinationPort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
expConn.DestinationServicePortName = servicePortName.String()
}
connStore.addOrUpdateConn(&test.flow)
actualConn, ok := connStore.GetConnByKey(flowTuple)
Expand All @@ -156,8 +189,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
IsActive: true,
}
// Flow-2, which is not in ConnectionStore
Expand All @@ -169,8 +202,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
OriginalBytes: 0xcbbb,
ReversePackets: 0xbbbb,
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
TupleOrig: tuple2,
TupleReply: revTuple2,
IsActive: true,
}
for i, flow := range testFlows {
Expand All @@ -180,7 +213,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
// Create ConnectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval)
// Add flows to the Connection store
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = *flow
Expand Down Expand Up @@ -218,8 +251,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
IsActive: true,
}
// Flow-2, which is not in ConnectionStore
Expand All @@ -231,8 +264,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
OriginalBytes: 0xcbbb,
ReversePackets: 0xbbbb,
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
TupleOrig: tuple2,
TupleReply: revTuple2,
IsActive: true,
}
for i, flow := range testFlows {
Expand All @@ -242,7 +275,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
// Create ConnectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval)
// Add flows to the connection store.
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = *flow
Expand Down
16 changes: 8 additions & 8 deletions pkg/agent/flowexporter/connections/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,25 @@ func TestConnTrackSystem_DumpFlows(t *testing.T) {
// Create flows for test
tuple, revTuple := makeTuple(&net.IP{1, 2, 3, 4}, &net.IP{4, 3, 2, 1}, 6, 65280, 255)
antreaFlow := &flowexporter.Connection{
TupleOrig: *tuple,
TupleReply: *revTuple,
TupleOrig: tuple,
TupleReply: revTuple,
Zone: openflow.CtZone,
}
tuple, revTuple = makeTuple(&net.IP{1, 2, 3, 4}, &net.IP{100, 50, 25, 5}, 6, 60001, 200)
antreaServiceFlow := &flowexporter.Connection{
TupleOrig: *tuple,
TupleReply: *revTuple,
TupleOrig: tuple,
TupleReply: revTuple,
Zone: openflow.CtZone,
}
tuple, revTuple = makeTuple(&net.IP{5, 6, 7, 8}, &net.IP{8, 7, 6, 5}, 6, 60001, 200)
antreaGWFlow := &flowexporter.Connection{
TupleOrig: *tuple,
TupleReply: *revTuple,
TupleOrig: tuple,
TupleReply: revTuple,
Zone: openflow.CtZone,
}
nonAntreaFlow := &flowexporter.Connection{
TupleOrig: *tuple,
TupleReply: *revTuple,
TupleOrig: tuple,
TupleReply: revTuple,
Zone: 100,
}
testFlows := []*flowexporter.Connection{antreaFlow, antreaServiceFlow, antreaGWFlow, nonAntreaFlow}
Expand Down
Loading