diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index ec426961d67..aec4fd4e949 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -245,6 +245,7 @@ func run(o *Options) error { connStore := connections.NewConnectionStore( connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, agentQuerier.GetOVSCtlClient(), o.config.OVSDatapathType), ifaceStore, + serviceCIDRNet, proxier, o.pollInterval) pollDone := make(chan struct{}) diff --git a/go.mod b/go.mod index 5ac3f2b9c39..3d3bdcb59dd 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/stretchr/testify v1.5.1 github.com/ti-mo/conntrack v0.3.0 github.com/vishvananda/netlink v1.1.0 - github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc + github.com/vmware/go-ipfix v0.1.0 golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e diff --git a/go.sum b/go.sum index 87e41ab24f3..fc49cd7be71 100644 --- a/go.sum +++ b/go.sum @@ -384,8 +384,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= -github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc h1:lytkY3WfWgOyyaOlgj/3Y5Fkwc9ENff2qg6Ul4FYriE= -github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU= +github.com/vmware/go-ipfix v0.1.0 h1:qbS1kJcs50vaTmyqN1VIk9I1YVikpuS+Uleze/wfYCA= +github.com/vmware/go-ipfix v0.1.0/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU= github.com/wenyingd/ofnet v0.0.0-20200609044910-a72f3e66744e h1:NM4NTe6Z+mF5IYlYAiEdRlY8XcMY4P6VlYqgsBhpojQ= github.com/wenyingd/ofnet v0.0.0-20200609044910-a72f3e66744e/go.mod h1:+g6SfqhTVqeGEmUJ0l4WtCgsL4dflTUJE4k+TPCKqXo= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index 340d3750231..169ba5cd49d 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -20,30 +20,36 @@ import ( "sync" "time" + corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" "github.com/vmware-tanzu/antrea/pkg/agent/openflow" "github.com/vmware-tanzu/antrea/pkg/agent/proxy" - "github.com/vmware-tanzu/antrea/pkg/util/ip" ) +var serviceProtocolMap = map[uint8]corev1.Protocol{ + 6: corev1.ProtocolTCP, + 17: corev1.ProtocolUDP, + 132: corev1.ProtocolSCTP, +} + type ConnectionStore struct { - connections map[flowexporter.ConnectionKey]flowexporter.Connection - connDumper ConnTrackDumper - ifaceStore interfacestore.InterfaceStore + connections map[flowexporter.ConnectionKey]flowexporter.Connection + connDumper ConnTrackDumper + ifaceStore interfacestore.InterfaceStore serviceCIDR *net.IPNet antreaProxier proxy.Proxier - pollInterval time.Duration - mutex sync.Mutex + pollInterval time.Duration + mutex sync.Mutex } func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, serviceCIDR *net.IPNet, proxier proxy.Proxier, pollInterval time.Duration) *ConnectionStore { return &ConnectionStore{ - connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection), - connDumper: connTrackDumper, - ifaceStore: ifaceStore, + connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection), + connDumper: connTrackDumper, + ifaceStore: ifaceStore, serviceCIDR: serviceCIDR, antreaProxier: proxier, pollInterval: pollInterval, @@ -122,21 +128,21 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) { conn.DoExport = false } - // Pod-to-Service flows w/ antrea-proxy:Antrea proxy is enabled. + // Process Pod-to-Service flows when Antrea Proxy is enabled. if cs.antreaProxier != nil { if cs.serviceCIDR.Contains(conn.TupleOrig.DestinationAddress) { clusterIP := conn.TupleOrig.DestinationAddress.String() svcPort := conn.TupleOrig.DestinationPort - protocol, err := ip.LookupServiceProtocol(conn.TupleOrig.Protocol) + protocol, err := lookupServiceProtocol(conn.TupleOrig.Protocol) if err != nil { - klog.Warningf("Could not retrieve service protocol: %v", err) + klog.Warningf("Could not retrieve Service protocol: %v", err) } else { serviceStr := fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol) servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr) if !exists { - klog.Warningf("Could not retrieve service info from antrea-agent-proxier for serviceStr: %s", serviceStr) + klog.Warningf("Could not retrieve the Service info from antrea-agent-proxier for the serviceStr: %s", serviceStr) } else { - conn.DestinationServiceName = servicePortName.String() + conn.DestinationServicePortName = servicePortName.String() } } } @@ -212,3 +218,12 @@ func (cs *ConnectionStore) DeleteConnectionByKey(connKey flowexporter.Connection return nil } + +// LookupServiceProtocol returns the corresponding Service protocol string for a given protocol identifier +func lookupServiceProtocol(protoID uint8) (corev1.Protocol, error) { + serviceProto, found := serviceProtocolMap[protoID] + if !found { + return "", fmt.Errorf("unknown protocol identifier: %d", protoID) + } + return serviceProto, nil +} diff --git a/pkg/agent/flowexporter/connections/connections_test.go b/pkg/agent/flowexporter/connections/connections_test.go index 55168e64a43..7fcc2533fae 100644 --- a/pkg/agent/flowexporter/connections/connections_test.go +++ b/pkg/agent/flowexporter/connections/connections_test.go @@ -30,21 +30,20 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing" proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing" - "github.com/vmware-tanzu/antrea/pkg/util/ip" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" ) const testPollInterval = 0 // Not used in these tests, hence 0. -func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (*flowexporter.Tuple, *flowexporter.Tuple) { - tuple := &flowexporter.Tuple{ +func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (flowexporter.Tuple, flowexporter.Tuple) { + tuple := flowexporter.Tuple{ SourceAddress: *srcIP, DestinationAddress: *dstIP, Protocol: protoID, SourcePort: srcPort, DestinationPort: dstPort, } - revTuple := &flowexporter.Tuple{ + revTuple := flowexporter.Tuple{ SourceAddress: *dstIP, DestinationAddress: *srcIP, Protocol: protoID, @@ -68,8 +67,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) { OriginalBytes: 0xbaaaaa0000000000, ReversePackets: 0xff, ReverseBytes: 0xbaaa, - TupleOrig: *tuple1, - TupleReply: *revTuple1, + TupleOrig: tuple1, + TupleReply: revTuple1, IsActive: true, } // Flow-2, which is not in ConnectionStore @@ -81,14 +80,14 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) { OriginalBytes: 0xcbbb, ReversePackets: 0xbbbb, ReverseBytes: 0xcbbbb0000000000, - TupleOrig: *tuple2, - TupleReply: *revTuple2, + TupleOrig: tuple2, + TupleReply: revTuple2, IsActive: true, } tuple3, revTuple3 := makeTuple(&net.IP{10, 10, 10, 10}, &net.IP{20, 20, 20, 20}, 6, 5000, 80) testFlow3 := flowexporter.Connection{ - TupleOrig: *tuple3, - TupleReply: *revTuple3, + TupleOrig: tuple3, + TupleReply: revTuple3, IsActive: true, } // Create copy of old conntrack flow for testing purposes. @@ -100,8 +99,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) { OriginalBytes: 0xbaaaaa00000000, ReversePackets: 0xf, ReverseBytes: 0xba, - TupleOrig: *tuple1, - TupleReply: *revTuple1, + TupleOrig: tuple1, + TupleReply: revTuple1, SourcePodNamespace: "ns1", SourcePodName: "pod1", DestinationPodNamespace: "", @@ -162,10 +161,10 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) { mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleOrig.SourceAddress.String()).Return(nil, false) mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleReply.SourceAddress.String()).Return(nil, false) - protocol, _ := ip.LookupServiceProtocol(expConn.TupleOrig.Protocol) + protocol, _ := lookupServiceProtocol(expConn.TupleOrig.Protocol) serviceStr := fmt.Sprintf("%s:%d/%s", expConn.TupleOrig.DestinationAddress.String(), expConn.TupleOrig.DestinationPort, protocol) mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true) - expConn.DestinationServiceName = servicePortName.String() + expConn.DestinationServicePortName = servicePortName.String() } connStore.addOrUpdateConn(&test.flow) actualConn, ok := connStore.GetConnByKey(flowTuple) @@ -190,8 +189,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) { OriginalBytes: 0xbaaaaa0000000000, ReversePackets: 0xff, ReverseBytes: 0xbaaa, - TupleOrig: *tuple1, - TupleReply: *revTuple1, + TupleOrig: tuple1, + TupleReply: revTuple1, IsActive: true, } // Flow-2, which is not in ConnectionStore @@ -203,8 +202,8 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) { OriginalBytes: 0xcbbb, ReversePackets: 0xbbbb, ReverseBytes: 0xcbbbb0000000000, - TupleOrig: *tuple2, - TupleReply: *revTuple2, + TupleOrig: tuple2, + TupleReply: revTuple2, IsActive: true, } for i, flow := range testFlows { @@ -252,8 +251,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { OriginalBytes: 0xbaaaaa0000000000, ReversePackets: 0xff, ReverseBytes: 0xbaaa, - TupleOrig: *tuple1, - TupleReply: *revTuple1, + TupleOrig: tuple1, + TupleReply: revTuple1, IsActive: true, } // Flow-2, which is not in ConnectionStore @@ -265,8 +264,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { OriginalBytes: 0xcbbb, ReversePackets: 0xbbbb, ReverseBytes: 0xcbbbb0000000000, - TupleOrig: *tuple2, - TupleReply: *revTuple2, + TupleOrig: tuple2, + TupleReply: revTuple2, IsActive: true, } for i, flow := range testFlows { diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index aad4ef75b32..7c06d582c9f 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -59,7 +59,7 @@ var ( "destinationPodNamespace", "destinationNodeName", "destinationClusterIP", - "destinationServiceName", + "destinationServicePortName", } ) @@ -325,7 +325,7 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex _, err = dataRec.AddInfoElement(ie, "") } case "destinationClusterIP": - if record.Conn.DestinationServiceName != "" { + if record.Conn.DestinationServicePortName != "" { _, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.DestinationAddress) } else { // Sending dummy IP as IPFIX collector expects constant length of data for IP field. @@ -333,9 +333,9 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex // this dummy IP address. _, err = dataRec.AddInfoElement(ie, net.IP{0, 0, 0, 0}) } - case "destinationServiceName": - if record.Conn.DestinationServiceName != "" { - _, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServiceName) + case "destinationServicePortName": + if record.Conn.DestinationServicePortName != "" { + _, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServicePortName) } else { _, err = dataRec.AddInfoElement(ie, "") } diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index caa11586d95..b44067d10b9 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -147,7 +147,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { for i, ie := range AntreaInfoElements { elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0) } - // Define mocks and flowExporter + mockIPFIXExpProc := ipfixtest.NewMockIPFIXExportingProcess(ctrl) mockDataRec := ipfixtest.NewMockIPFIXRecord(ctrl) flowExp := &flowExporter{ @@ -175,7 +175,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { mockDataRec.EXPECT().AddInfoElement(ie, uint8(0)).Return(tempBytes, nil) case "packetTotalCount", "octetTotalCount", "packetDeltaCount", "octetDeltaCount", "reverse_PacketTotalCount", "reverse_OctetTotalCount", "reverse_PacketDeltaCount", "reverse_OctetDeltaCount": mockDataRec.EXPECT().AddInfoElement(ie, uint64(0)).Return(tempBytes, nil) - case "sourcePodName", "sourcePodNamespace", "sourceNodeName", "destinationPodName", "destinationPodNamespace", "destinationNodeName", "destinationServiceName": + case "sourcePodName", "sourcePodNamespace", "sourceNodeName", "destinationPodName", "destinationPodNamespace", "destinationNodeName", "destinationServicePortName": mockDataRec.EXPECT().AddInfoElement(ie, "").Return(tempBytes, nil) } } diff --git a/pkg/agent/flowexporter/types.go b/pkg/agent/flowexporter/types.go index 49ccb5ed6c5..a27a806aaf8 100644 --- a/pkg/agent/flowexporter/types.go +++ b/pkg/agent/flowexporter/types.go @@ -51,11 +51,11 @@ type Connection struct { OriginalPackets, OriginalBytes uint64 ReversePackets, ReverseBytes uint64 // Fields specific to Antrea - SourcePodNamespace string - SourcePodName string - DestinationPodNamespace string - DestinationPodName string - DestinationServiceName string + SourcePodNamespace string + SourcePodName string + DestinationPodNamespace string + DestinationPodName string + DestinationServicePortName string } type FlowRecord struct { diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 03022fa1274..4466ad7c71b 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -282,7 +282,7 @@ func (p *proxier) GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, b func (p *proxier) addServiceByIP(serviceStr string, servicePortName k8sproxy.ServicePortName) { p.serviceStringMapMutex.Lock() defer p.serviceStringMapMutex.Unlock() - klog.V(2).Infof("Added service with key: %v value: %v", serviceStr, servicePortName) + p.serviceStringMap[serviceStr] = servicePortName } diff --git a/pkg/util/ip/ip.go b/pkg/util/ip/ip.go index d573cd7f52d..e8e658d5be0 100644 --- a/pkg/util/ip/ip.go +++ b/pkg/util/ip/ip.go @@ -20,8 +20,6 @@ import ( "net" "sort" - corev1 "k8s.io/api/core/v1" - "github.com/vmware-tanzu/antrea/pkg/apis/networking/v1beta1" ) @@ -30,12 +28,6 @@ const ( v6BitLen = 8 * net.IPv6len ) -var serviceProtocolMap = map[uint8]corev1.Protocol{ - 6: corev1.ProtocolTCP, - 17: corev1.ProtocolUDP, - 132: corev1.ProtocolSCTP, -} - // This function takes in one allow CIDR and multiple except CIDRs and gives diff CIDRs // in allowCIDR eliminating except CIDRs. It currently supports only IPv4. except CIDR input // can be changed. @@ -154,12 +146,3 @@ func NetIPNetToIPNet(ipNet *net.IPNet) *v1beta1.IPNet { prefix, _ := ipNet.Mask.Size() return &v1beta1.IPNet{IP: v1beta1.IPAddress(ipNet.IP), PrefixLength: int32(prefix)} } - -// LookupServiceProtocol return service protocol string given protocol identifier -func LookupServiceProtocol(protoID uint8) (corev1.Protocol, error) { - serviceProto, found := serviceProtocolMap[protoID] - if !found { - return "", fmt.Errorf("unknown protocol identifier: %d", protoID) - } - return serviceProto, nil -} diff --git a/plugins/octant/go.sum b/plugins/octant/go.sum index 8eb824573cb..fa5aeb10ce5 100644 --- a/plugins/octant/go.sum +++ b/plugins/octant/go.sum @@ -469,7 +469,7 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vmware-tanzu/octant v0.13.1 h1:hz4JDnAA7xDkFjF4VEbt5SrSRrG26FCxKXXBGapf6Nc= github.com/vmware-tanzu/octant v0.13.1/go.mod h1:4q+wrV4tmUwAdMjvYOujSTtZbE4+zm0n5mb7FjvN0I0= -github.com/vmware/go-ipfix v0.0.0-20200808032647-11daf237d1dc/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU= +github.com/vmware/go-ipfix v0.1.0/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU= github.com/wenyingd/ofnet v0.0.0-20200601065543-2c7a62482f16/go.mod h1:+g6SfqhTVqeGEmUJ0l4WtCgsL4dflTUJE4k+TPCKqXo= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= diff --git a/test/e2e/flowexporter_test.go b/test/e2e/flowexporter_test.go index 96fbd822cc9..0b76d245ff3 100644 --- a/test/e2e/flowexporter_test.go +++ b/test/e2e/flowexporter_test.go @@ -142,7 +142,7 @@ func TestFlowExporter(t *testing.T) { t.Fatalf("Error in converting octetDeltaCount to int type") } // compute the bandwidth using 5s as interval - recBandwidth := (deltaBytes * 8.0) / float64((int64(5.0))*time.Second.Nanoseconds()) + recBandwidth := (deltaBytes * 8.0) / float64(5*time.Second.Nanoseconds()) // bandwidth from iperf output bwSlice := strings.Split(bandwidth, " ") iperfBandwidth, err := strconv.ParseFloat(bwSlice[0], 64) diff --git a/test/integration/agent/flowexporter_test.go b/test/integration/agent/flowexporter_test.go index d7fbe4a258e..f6927b1a3ec 100644 --- a/test/integration/agent/flowexporter_test.go +++ b/test/integration/agent/flowexporter_test.go @@ -125,7 +125,8 @@ func TestConnectionStoreAndFlowRecords(t *testing.T) { // Create ConnectionStore, FlowRecords and associated mocks connDumperMock := connectionstest.NewMockConnTrackDumper(ctrl) ifStoreMock := interfacestoretest.NewMockInterfaceStore(ctrl) - connStore := connections.NewConnectionStore(connDumperMock, ifStoreMock, testPollInterval) + // TODO: Enhance the integration test by testing service. + connStore := connections.NewConnectionStore(connDumperMock, ifStoreMock, nil, nil, testPollInterval) // Expect calls for connStore.poll and other callees connDumperMock.EXPECT().DumpFlows(uint16(openflow.CtZone)).Return(testConns, nil) for i, testConn := range testConns {