Skip to content

Commit

Permalink
Addressed the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
srikartati committed Aug 11, 2020
1 parent e3ad965 commit bd8f387
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 72 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func run(o *Options) error {
connStore := connections.NewConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, agentQuerier.GetOVSCtlClient(), o.config.OVSDatapathType),
ifaceStore,
serviceCIDRNet,
proxier,
o.pollInterval)
pollDone := make(chan struct{})
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/stretchr/testify v1.5.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc
github.com/vmware/go-ipfix v0.1.0
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc h1:lytkY3WfWgOyyaOlgj/3Y5Fkwc9ENff2qg6Ul4FYriE=
github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/vmware/go-ipfix v0.1.0 h1:qbS1kJcs50vaTmyqN1VIk9I1YVikpuS+Uleze/wfYCA=
github.com/vmware/go-ipfix v0.1.0/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20200609044910-a72f3e66744e h1:NM4NTe6Z+mF5IYlYAiEdRlY8XcMY4P6VlYqgsBhpojQ=
github.com/wenyingd/ofnet v0.0.0-20200609044910-a72f3e66744e/go.mod h1:+g6SfqhTVqeGEmUJ0l4WtCgsL4dflTUJE4k+TPCKqXo=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
43 changes: 29 additions & 14 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,36 @@ import (
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/agent/proxy"
"github.com/vmware-tanzu/antrea/pkg/util/ip"
)

var serviceProtocolMap = map[uint8]corev1.Protocol{
6: corev1.ProtocolTCP,
17: corev1.ProtocolUDP,
132: corev1.ProtocolSCTP,
}

type ConnectionStore struct {
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
serviceCIDR *net.IPNet
antreaProxier proxy.Proxier
pollInterval time.Duration
mutex sync.Mutex
pollInterval time.Duration
mutex sync.Mutex
}

func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, serviceCIDR *net.IPNet, proxier proxy.Proxier, pollInterval time.Duration) *ConnectionStore {
return &ConnectionStore{
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
serviceCIDR: serviceCIDR,
antreaProxier: proxier,
pollInterval: pollInterval,
Expand Down Expand Up @@ -122,21 +128,21 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
conn.DoExport = false
}

// Pod-to-Service flows w/ antrea-proxy:Antrea proxy is enabled.
// Process Pod-to-Service flows when Antrea Proxy is enabled.
if cs.antreaProxier != nil {
if cs.serviceCIDR.Contains(conn.TupleOrig.DestinationAddress) {
clusterIP := conn.TupleOrig.DestinationAddress.String()
svcPort := conn.TupleOrig.DestinationPort
protocol, err := ip.LookupServiceProtocol(conn.TupleOrig.Protocol)
protocol, err := lookupServiceProtocol(conn.TupleOrig.Protocol)
if err != nil {
klog.Warningf("Could not retrieve service protocol: %v", err)
klog.Warningf("Could not retrieve Service protocol: %v", err)
} else {
serviceStr := fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol)
servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr)
if !exists {
klog.Warningf("Could not retrieve service info from antrea-agent-proxier for serviceStr: %s", serviceStr)
klog.Warningf("Could not retrieve the Service info from antrea-agent-proxier for the serviceStr: %s", serviceStr)
} else {
conn.DestinationServiceName = servicePortName.String()
conn.DestinationServicePortName = servicePortName.String()
}
}
}
Expand Down Expand Up @@ -212,3 +218,12 @@ func (cs *ConnectionStore) DeleteConnectionByKey(connKey flowexporter.Connection

return nil
}

// LookupServiceProtocol returns the corresponding Service protocol string for a given protocol identifier
func lookupServiceProtocol(protoID uint8) (corev1.Protocol, error) {
serviceProto, found := serviceProtocolMap[protoID]
if !found {
return "", fmt.Errorf("unknown protocol identifier: %d", protoID)
}
return serviceProto, nil
}
43 changes: 21 additions & 22 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,20 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing"
proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing"
"github.com/vmware-tanzu/antrea/pkg/util/ip"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
)

const testPollInterval = 0 // Not used in these tests, hence 0.

func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (*flowexporter.Tuple, *flowexporter.Tuple) {
tuple := &flowexporter.Tuple{
func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (flowexporter.Tuple, flowexporter.Tuple) {
tuple := flowexporter.Tuple{
SourceAddress: *srcIP,
DestinationAddress: *dstIP,
Protocol: protoID,
SourcePort: srcPort,
DestinationPort: dstPort,
}
revTuple := &flowexporter.Tuple{
revTuple := flowexporter.Tuple{
SourceAddress: *dstIP,
DestinationAddress: *srcIP,
Protocol: protoID,
Expand All @@ -68,8 +67,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
IsActive: true,
}
// Flow-2, which is not in ConnectionStore
Expand All @@ -81,14 +80,14 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
OriginalBytes: 0xcbbb,
ReversePackets: 0xbbbb,
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
TupleOrig: tuple2,
TupleReply: revTuple2,
IsActive: true,
}
tuple3, revTuple3 := makeTuple(&net.IP{10, 10, 10, 10}, &net.IP{20, 20, 20, 20}, 6, 5000, 80)
testFlow3 := flowexporter.Connection{
TupleOrig: *tuple3,
TupleReply: *revTuple3,
TupleOrig: tuple3,
TupleReply: revTuple3,
IsActive: true,
}
// Create copy of old conntrack flow for testing purposes.
Expand All @@ -100,8 +99,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
ReverseBytes: 0xba,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
SourcePodNamespace: "ns1",
SourcePodName: "pod1",
DestinationPodNamespace: "",
Expand Down Expand Up @@ -162,10 +161,10 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleReply.SourceAddress.String()).Return(nil, false)

protocol, _ := ip.LookupServiceProtocol(expConn.TupleOrig.Protocol)
protocol, _ := lookupServiceProtocol(expConn.TupleOrig.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", expConn.TupleOrig.DestinationAddress.String(), expConn.TupleOrig.DestinationPort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
expConn.DestinationServiceName = servicePortName.String()
expConn.DestinationServicePortName = servicePortName.String()
}
connStore.addOrUpdateConn(&test.flow)
actualConn, ok := connStore.GetConnByKey(flowTuple)
Expand All @@ -190,8 +189,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
IsActive: true,
}
// Flow-2, which is not in ConnectionStore
Expand All @@ -203,8 +202,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
OriginalBytes: 0xcbbb,
ReversePackets: 0xbbbb,
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
TupleOrig: tuple2,
TupleReply: revTuple2,
IsActive: true,
}
for i, flow := range testFlows {
Expand Down Expand Up @@ -252,8 +251,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
ReverseBytes: 0xbaaa,
TupleOrig: *tuple1,
TupleReply: *revTuple1,
TupleOrig: tuple1,
TupleReply: revTuple1,
IsActive: true,
}
// Flow-2, which is not in ConnectionStore
Expand All @@ -265,8 +264,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
OriginalBytes: 0xcbbb,
ReversePackets: 0xbbbb,
ReverseBytes: 0xcbbbb0000000000,
TupleOrig: *tuple2,
TupleReply: *revTuple2,
TupleOrig: tuple2,
TupleReply: revTuple2,
IsActive: true,
}
for i, flow := range testFlows {
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
"destinationPodNamespace",
"destinationNodeName",
"destinationClusterIP",
"destinationServiceName",
"destinationServicePortName",
}
)

Expand Down Expand Up @@ -325,17 +325,17 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex
_, err = dataRec.AddInfoElement(ie, "")
}
case "destinationClusterIP":
if record.Conn.DestinationServiceName != "" {
if record.Conn.DestinationServicePortName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.DestinationAddress)
} else {
// Sending dummy IP as IPFIX collector expects constant length of data for IP field.
// We should probably think of better approach as this involves customization of IPFIX collector to ignore
// this dummy IP address.
_, err = dataRec.AddInfoElement(ie, net.IP{0, 0, 0, 0})
}
case "destinationServiceName":
if record.Conn.DestinationServiceName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServiceName)
case "destinationServicePortName":
if record.Conn.DestinationServicePortName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServicePortName)
} else {
_, err = dataRec.AddInfoElement(ie, "")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) {
for i, ie := range AntreaInfoElements {
elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0)
}
// Define mocks and flowExporter

mockIPFIXExpProc := ipfixtest.NewMockIPFIXExportingProcess(ctrl)
mockDataRec := ipfixtest.NewMockIPFIXRecord(ctrl)
flowExp := &flowExporter{
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) {
mockDataRec.EXPECT().AddInfoElement(ie, uint8(0)).Return(tempBytes, nil)
case "packetTotalCount", "octetTotalCount", "packetDeltaCount", "octetDeltaCount", "reverse_PacketTotalCount", "reverse_OctetTotalCount", "reverse_PacketDeltaCount", "reverse_OctetDeltaCount":
mockDataRec.EXPECT().AddInfoElement(ie, uint64(0)).Return(tempBytes, nil)
case "sourcePodName", "sourcePodNamespace", "sourceNodeName", "destinationPodName", "destinationPodNamespace", "destinationNodeName", "destinationServiceName":
case "sourcePodName", "sourcePodNamespace", "sourceNodeName", "destinationPodName", "destinationPodNamespace", "destinationNodeName", "destinationServicePortName":
mockDataRec.EXPECT().AddInfoElement(ie, "").Return(tempBytes, nil)
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type Connection struct {
OriginalPackets, OriginalBytes uint64
ReversePackets, ReverseBytes uint64
// Fields specific to Antrea
SourcePodNamespace string
SourcePodName string
DestinationPodNamespace string
DestinationPodName string
DestinationServiceName string
SourcePodNamespace string
SourcePodName string
DestinationPodNamespace string
DestinationPodName string
DestinationServicePortName string
}

type FlowRecord struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (p *proxier) GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, b
func (p *proxier) addServiceByIP(serviceStr string, servicePortName k8sproxy.ServicePortName) {
p.serviceStringMapMutex.Lock()
defer p.serviceStringMapMutex.Unlock()
klog.V(2).Infof("Added service with key: %v value: %v", serviceStr, servicePortName)

p.serviceStringMap[serviceStr] = servicePortName
}

Expand Down
17 changes: 0 additions & 17 deletions pkg/util/ip/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"net"
"sort"

corev1 "k8s.io/api/core/v1"

"github.com/vmware-tanzu/antrea/pkg/apis/networking/v1beta1"
)

Expand All @@ -30,12 +28,6 @@ const (
v6BitLen = 8 * net.IPv6len
)

var serviceProtocolMap = map[uint8]corev1.Protocol{
6: corev1.ProtocolTCP,
17: corev1.ProtocolUDP,
132: corev1.ProtocolSCTP,
}

// This function takes in one allow CIDR and multiple except CIDRs and gives diff CIDRs
// in allowCIDR eliminating except CIDRs. It currently supports only IPv4. except CIDR input
// can be changed.
Expand Down Expand Up @@ -154,12 +146,3 @@ func NetIPNetToIPNet(ipNet *net.IPNet) *v1beta1.IPNet {
prefix, _ := ipNet.Mask.Size()
return &v1beta1.IPNet{IP: v1beta1.IPAddress(ipNet.IP), PrefixLength: int32(prefix)}
}

// LookupServiceProtocol return service protocol string given protocol identifier
func LookupServiceProtocol(protoID uint8) (corev1.Protocol, error) {
serviceProto, found := serviceProtocolMap[protoID]
if !found {
return "", fmt.Errorf("unknown protocol identifier: %d", protoID)
}
return serviceProto, nil
}
2 changes: 1 addition & 1 deletion plugins/octant/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware-tanzu/octant v0.13.1 h1:hz4JDnAA7xDkFjF4VEbt5SrSRrG26FCxKXXBGapf6Nc=
github.com/vmware-tanzu/octant v0.13.1/go.mod h1:4q+wrV4tmUwAdMjvYOujSTtZbE4+zm0n5mb7FjvN0I0=
github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/vmware/go-ipfix v0.1.0/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20200601065543-2c7a62482f16/go.mod h1:+g6SfqhTVqeGEmUJ0l4WtCgsL4dflTUJE4k+TPCKqXo=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/flowexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestFlowExporter(t *testing.T) {
t.Fatalf("Error in converting octetDeltaCount to int type")
}
// compute the bandwidth using 5s as interval
recBandwidth := (deltaBytes * 8.0) / float64((int64(5.0))*time.Second.Nanoseconds())
recBandwidth := (deltaBytes * 8.0) / float64(5*time.Second.Nanoseconds())
// bandwidth from iperf output
bwSlice := strings.Split(bandwidth, " ")
iperfBandwidth, err := strconv.ParseFloat(bwSlice[0], 64)
Expand Down
3 changes: 2 additions & 1 deletion test/integration/agent/flowexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func TestConnectionStoreAndFlowRecords(t *testing.T) {
// Create ConnectionStore, FlowRecords and associated mocks
connDumperMock := connectionstest.NewMockConnTrackDumper(ctrl)
ifStoreMock := interfacestoretest.NewMockInterfaceStore(ctrl)
connStore := connections.NewConnectionStore(connDumperMock, ifStoreMock, testPollInterval)
// TODO: Enhance the integration test by testing service.
connStore := connections.NewConnectionStore(connDumperMock, ifStoreMock, nil, nil, testPollInterval)
// Expect calls for connStore.poll and other callees
connDumperMock.EXPECT().DumpFlows(uint16(openflow.CtZone)).Return(testConns, nil)
for i, testConn := range testConns {
Expand Down

0 comments on commit bd8f387

Please sign in to comment.