diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 451a05d9776..ec426961d67 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -160,7 +160,7 @@ func run(o *Options) error { if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { isChaining = true } - var proxier *proxy.Proxier + var proxier proxy.Proxier if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { proxier = proxy.New(nodeConfig.Name, informerFactory, ofClient) } @@ -245,6 +245,7 @@ func run(o *Options) error { connStore := connections.NewConnectionStore( connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, agentQuerier.GetOVSCtlClient(), o.config.OVSDatapathType), ifaceStore, + proxier, o.pollInterval) pollDone := make(chan struct{}) go connStore.Run(stopCh, pollDone) diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index 1395dacf192..47908cb10e8 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -94,6 +94,7 @@ MOCKGEN_TARGETS=( "pkg/querier AgentNetworkPolicyInfoQuerier" "pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack" "pkg/agent/flowexporter/ipfix IPFIXExportingProcess,IPFIXRecord" + "pkg/agent/proxy Proxier" ) # Command mockgen does not automatically replace variable YEAR with current year diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index f09c325cd22..340d3750231 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -16,6 +16,7 @@ package connections import ( "fmt" + "net" "sync" "time" @@ -24,22 +25,28 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" "github.com/vmware-tanzu/antrea/pkg/agent/openflow" + "github.com/vmware-tanzu/antrea/pkg/agent/proxy" + "github.com/vmware-tanzu/antrea/pkg/util/ip" ) type ConnectionStore struct { connections map[flowexporter.ConnectionKey]flowexporter.Connection connDumper ConnTrackDumper ifaceStore interfacestore.InterfaceStore + serviceCIDR *net.IPNet + antreaProxier proxy.Proxier pollInterval time.Duration mutex sync.Mutex } -func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, pollInterval time.Duration) *ConnectionStore { +func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, serviceCIDR *net.IPNet, proxier proxy.Proxier, pollInterval time.Duration) *ConnectionStore { return &ConnectionStore{ connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection), connDumper: connTrackDumper, ifaceStore: ifaceStore, - pollInterval: pollInterval, + serviceCIDR: serviceCIDR, + antreaProxier: proxier, + pollInterval: pollInterval, } } @@ -91,6 +98,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) { cs.connections[connKey] = *existingConn klog.V(4).Infof("Antrea flow updated: %v", existingConn) } else { + // sourceIP/destinationIP are mapped only to local pods and not remote pods. var srcFound, dstFound bool sIface, srcFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleOrig.SourceAddress.String()) dIface, dstFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleReply.SourceAddress.String()) @@ -113,6 +121,26 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) { if !srcFound && dstFound { conn.DoExport = false } + + // Pod-to-Service flows w/ antrea-proxy:Antrea proxy is enabled. + if cs.antreaProxier != nil { + if cs.serviceCIDR.Contains(conn.TupleOrig.DestinationAddress) { + clusterIP := conn.TupleOrig.DestinationAddress.String() + svcPort := conn.TupleOrig.DestinationPort + protocol, err := ip.LookupServiceProtocol(conn.TupleOrig.Protocol) + if err != nil { + klog.Warningf("Could not retrieve service protocol: %v", err) + } else { + serviceStr := fmt.Sprintf("%s:%d/%s", clusterIP, svcPort, protocol) + servicePortName, exists := cs.antreaProxier.GetServiceByIP(serviceStr) + if !exists { + klog.Warningf("Could not retrieve service info from antrea-agent-proxier for serviceStr: %s", serviceStr) + } else { + conn.DestinationServiceName = servicePortName.String() + } + } + } + } klog.V(4).Infof("New Antrea flow added: %v", conn) // Add new antrea connection to connection store cs.connections[connKey] = *conn diff --git a/pkg/agent/flowexporter/connections/connections_test.go b/pkg/agent/flowexporter/connections/connections_test.go index b4233b1b82e..55168e64a43 100644 --- a/pkg/agent/flowexporter/connections/connections_test.go +++ b/pkg/agent/flowexporter/connections/connections_test.go @@ -15,17 +15,23 @@ package connections import ( + "fmt" "net" "testing" "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" connectionstest "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections/testing" "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing" + proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing" + "github.com/vmware-tanzu/antrea/pkg/util/ip" + k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" ) const testPollInterval = 0 // Not used in these tests, hence 0. @@ -79,6 +85,12 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) { TupleReply: *revTuple2, IsActive: true, } + tuple3, revTuple3 := makeTuple(&net.IP{10, 10, 10, 10}, &net.IP{20, 20, 20, 20}, 6, 5000, 80) + testFlow3 := flowexporter.Connection{ + TupleOrig: *tuple3, + TupleReply: *revTuple3, + IsActive: true, + } // Create copy of old conntrack flow for testing purposes. // This flow is already in connection store. oldTestFlow1 := flowexporter.Connection{ @@ -106,9 +118,24 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) { IP: net.IP{8, 7, 6, 5}, ContainerInterfaceConfig: podConfigFlow2, } + serviceCIDR := &net.IPNet{ + IP: net.IP{20, 20, 20, 0}, + Mask: net.IPMask(net.ParseIP("255.255.255.0").To4()), + } + servicePortName := k8sproxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + "serviceNS1", + "service1", + }, + Port: "255", + Protocol: v1.ProtocolTCP, + } + // Mock interface store with one of the couple of IPs correspond to Pods mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) - connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval) + mockProxier := proxytest.NewMockProxier(ctrl) + connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, serviceCIDR, mockProxier, testPollInterval) + // Add flow1conn to the Connection map testFlow1Tuple := flowexporter.NewConnectionKey(&testFlow1) connStore.connections[testFlow1Tuple] = oldTestFlow1 @@ -118,20 +145,27 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) { }{ {testFlow1}, // To test update part of function {testFlow2}, // To test add part of function + {testFlow3}, // To test service name realization } for i, test := range addOrUpdateConnTests { flowTuple := flowexporter.NewConnectionKey(&test.flow) - var expConn flowexporter.Connection + expConn := test.flow if i == 0 { - expConn = test.flow expConn.SourcePodNamespace = "ns1" expConn.SourcePodName = "pod1" - } else { - expConn = test.flow + } else if i == 1 { expConn.DestinationPodNamespace = "ns2" expConn.DestinationPodName = "pod2" mockIfaceStore.EXPECT().GetInterfaceByIP(test.flow.TupleOrig.SourceAddress.String()).Return(nil, false) mockIfaceStore.EXPECT().GetInterfaceByIP(test.flow.TupleReply.SourceAddress.String()).Return(interfaceFlow2, true) + } else { + mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleOrig.SourceAddress.String()).Return(nil, false) + mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleReply.SourceAddress.String()).Return(nil, false) + + protocol, _ := ip.LookupServiceProtocol(expConn.TupleOrig.Protocol) + serviceStr := fmt.Sprintf("%s:%d/%s", expConn.TupleOrig.DestinationAddress.String(), expConn.TupleOrig.DestinationPort, protocol) + mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true) + expConn.DestinationServiceName = servicePortName.String() } connStore.addOrUpdateConn(&test.flow) actualConn, ok := connStore.GetConnByKey(flowTuple) @@ -180,7 +214,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) { // Create ConnectionStore mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) - connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval) + connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval) // Add flows to the Connection store for i, flow := range testFlows { connStore.connections[*testFlowKeys[i]] = *flow @@ -242,7 +276,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { // Create ConnectionStore mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) - connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, testPollInterval) + connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval) // Add flows to the connection store. for i, flow := range testFlows { connStore.connections[*testFlowKeys[i]] = *flow diff --git a/pkg/agent/flowexporter/connections/conntrack_linux.go b/pkg/agent/flowexporter/connections/conntrack_linux.go index f3e238bf5b4..9655ded20bb 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux.go @@ -146,6 +146,7 @@ func netlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connectio "", "", "", + "", } return &newConn diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index d986f24321f..aad4ef75b32 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -58,6 +58,8 @@ var ( "destinationPodName", "destinationPodNamespace", "destinationNodeName", + "destinationClusterIP", + "destinationServiceName", } ) @@ -322,6 +324,21 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex } else { _, err = dataRec.AddInfoElement(ie, "") } + case "destinationClusterIP": + if record.Conn.DestinationServiceName != "" { + _, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.DestinationAddress) + } else { + // Sending dummy IP as IPFIX collector expects constant length of data for IP field. + // We should probably think of better approach as this involves customization of IPFIX collector to ignore + // this dummy IP address. + _, err = dataRec.AddInfoElement(ie, net.IP{0, 0, 0, 0}) + } + case "destinationServiceName": + if record.Conn.DestinationServiceName != "" { + _, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServiceName) + } else { + _, err = dataRec.AddInfoElement(ie, "") + } } if err != nil { return fmt.Errorf("error while adding info element: %s to data record: %v", ie.Name, err) diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 92ae0283d30..caa11586d95 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -15,6 +15,7 @@ package exporter import ( + "net" "strings" "testing" "time" @@ -146,6 +147,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { for i, ie := range AntreaInfoElements { elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0) } + // Define mocks and flowExporter mockIPFIXExpProc := ipfixtest.NewMockIPFIXExportingProcess(ctrl) mockDataRec := ipfixtest.NewMockIPFIXRecord(ctrl) flowExp := &flowExporter{ @@ -165,13 +167,15 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { mockDataRec.EXPECT().AddInfoElement(ie, time.Time{}.Unix()).Return(tempBytes, nil) case "sourceIPv4Address", "destinationIPv4Address": mockDataRec.EXPECT().AddInfoElement(ie, nil).Return(tempBytes, nil) + case "destinationClusterIP": + mockDataRec.EXPECT().AddInfoElement(ie, net.IP{0, 0, 0, 0}).Return(tempBytes, nil) case "sourceTransportPort", "destinationTransportPort": mockDataRec.EXPECT().AddInfoElement(ie, uint16(0)).Return(tempBytes, nil) case "protocolIdentifier": mockDataRec.EXPECT().AddInfoElement(ie, uint8(0)).Return(tempBytes, nil) case "packetTotalCount", "octetTotalCount", "packetDeltaCount", "octetDeltaCount", "reverse_PacketTotalCount", "reverse_OctetTotalCount", "reverse_PacketDeltaCount", "reverse_OctetDeltaCount": mockDataRec.EXPECT().AddInfoElement(ie, uint64(0)).Return(tempBytes, nil) - case "sourcePodName", "sourcePodNamespace", "sourceNodeName", "destinationPodName", "destinationPodNamespace", "destinationNodeName": + case "sourcePodName", "sourcePodNamespace", "sourceNodeName", "destinationPodName", "destinationPodNamespace", "destinationNodeName", "destinationServiceName": mockDataRec.EXPECT().AddInfoElement(ie, "").Return(tempBytes, nil) } } diff --git a/pkg/agent/flowexporter/types.go b/pkg/agent/flowexporter/types.go index 319e59c8f92..49ccb5ed6c5 100644 --- a/pkg/agent/flowexporter/types.go +++ b/pkg/agent/flowexporter/types.go @@ -55,6 +55,7 @@ type Connection struct { SourcePodName string DestinationPodNamespace string DestinationPodName string + DestinationServiceName string } type FlowRecord struct { diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 90d5805c668..03022fa1274 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -38,8 +38,15 @@ const ( componentName = "antrea-agent-proxy" ) +var _ Proxier = new(proxier) + +type Proxier interface { + Run(stopCh <-chan struct{}) + GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, bool) +} + // TODO: Add metrics -type Proxier struct { +type proxier struct { once sync.Once endpointsConfig *config.EndpointsConfig serviceConfig *config.ServiceConfig @@ -60,6 +67,10 @@ type Proxier struct { // endpointInstalledMap stores endpoints we actually installed. endpointInstalledMap map[k8sproxy.ServicePortName]map[string]struct{} groupCounter types.GroupCounter + // serviceStringMap provides map from serviceString(ClusterIP:Port/Proto) to ServicePortName. + serviceStringMap map[string]k8sproxy.ServicePortName + // serviceStringMapMutex protects serviceStringMap object. + serviceStringMapMutex sync.Mutex runner *k8sproxy.BoundedFrequencyRunner stopChan <-chan struct{} @@ -67,11 +78,11 @@ type Proxier struct { ofClient openflow.Client } -func (p *Proxier) isInitialized() bool { +func (p *proxier) isInitialized() bool { return p.endpointsChanges.Synced() && p.serviceChanges.Synced() } -func (p *Proxier) removeStaleServices() { +func (p *proxier) removeStaleServices() { for svcPortName, svcPort := range p.serviceInstalledMap { if _, ok := p.serviceMap[svcPortName]; ok { continue @@ -101,11 +112,12 @@ func (p *Proxier) removeStaleServices() { continue } delete(p.serviceInstalledMap, svcPortName) + p.deleteServiceByIP(svcInfo.String()) p.groupCounter.Recycle(svcPortName) } } -func (p *Proxier) removeStaleEndpoints(staleEndpoints map[k8sproxy.ServicePortName]map[string]k8sproxy.Endpoint) { +func (p *proxier) removeStaleEndpoints(staleEndpoints map[k8sproxy.ServicePortName]map[string]k8sproxy.Endpoint) { for svcPortName, endpoints := range staleEndpoints { bindingProtocol := binding.ProtocolTCP if svcPortName.Protocol == corev1.ProtocolUDP { @@ -128,7 +140,7 @@ func (p *Proxier) removeStaleEndpoints(staleEndpoints map[k8sproxy.ServicePortNa } } -func (p *Proxier) installServices() { +func (p *proxier) installServices() { for svcPortName, svcPort := range p.serviceMap { svcInfo := svcPort.(*types.ServiceInfo) groupID, _ := p.groupCounter.Get(svcPortName) @@ -185,13 +197,14 @@ func (p *Proxier) installServices() { } } p.serviceInstalledMap[svcPortName] = svcPort + p.addServiceByIP(svcInfo.String(), svcPortName) } } // syncProxyRulesMutex applies current changes in change trackers and then updates // flows for services and endpoints. It will abort if either endpoints or services // resources is not synced. -func (p *Proxier) syncProxyRules() { +func (p *proxier) syncProxyRules() { p.syncProxyRulesMutex.Lock() defer p.syncProxyRulesMutex.Unlock() @@ -212,53 +225,75 @@ func (p *Proxier) syncProxyRules() { p.installServices() } -func (p *Proxier) SyncLoop() { +func (p *proxier) SyncLoop() { p.runner.Loop(p.stopChan) } -func (p *Proxier) OnEndpointsAdd(endpoints *corev1.Endpoints) { +func (p *proxier) OnEndpointsAdd(endpoints *corev1.Endpoints) { p.OnEndpointsUpdate(nil, endpoints) } -func (p *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoints) { +func (p *proxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoints) { if p.endpointsChanges.OnEndpointUpdate(oldEndpoints, endpoints) && p.isInitialized() { p.runner.Run() } } -func (p *Proxier) OnEndpointsDelete(endpoints *corev1.Endpoints) { +func (p *proxier) OnEndpointsDelete(endpoints *corev1.Endpoints) { p.OnEndpointsUpdate(endpoints, nil) } -func (p *Proxier) OnEndpointsSynced() { +func (p *proxier) OnEndpointsSynced() { p.endpointsChanges.OnEndpointsSynced() if p.isInitialized() { p.runner.Run() } } -func (p *Proxier) OnServiceAdd(service *corev1.Service) { +func (p *proxier) OnServiceAdd(service *corev1.Service) { p.OnServiceUpdate(nil, service) } -func (p *Proxier) OnServiceUpdate(oldService, service *corev1.Service) { +func (p *proxier) OnServiceUpdate(oldService, service *corev1.Service) { if p.serviceChanges.OnServiceUpdate(oldService, service) && p.isInitialized() { p.runner.Run() } } -func (p *Proxier) OnServiceDelete(service *corev1.Service) { +func (p *proxier) OnServiceDelete(service *corev1.Service) { p.OnServiceUpdate(service, nil) } -func (p *Proxier) OnServiceSynced() { +func (p *proxier) OnServiceSynced() { p.serviceChanges.OnServiceSynced() if p.isInitialized() { p.runner.Run() } } -func (p *Proxier) Run(stopCh <-chan struct{}) { +func (p *proxier) GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, bool) { + p.serviceStringMapMutex.Lock() + defer p.serviceStringMapMutex.Unlock() + + serviceInfo, exists := p.serviceStringMap[serviceStr] + return serviceInfo, exists +} + +func (p *proxier) addServiceByIP(serviceStr string, servicePortName k8sproxy.ServicePortName) { + p.serviceStringMapMutex.Lock() + defer p.serviceStringMapMutex.Unlock() + klog.V(2).Infof("Added service with key: %v value: %v", serviceStr, servicePortName) + p.serviceStringMap[serviceStr] = servicePortName +} + +func (p *proxier) deleteServiceByIP(serviceStr string) { + p.serviceStringMapMutex.Lock() + defer p.serviceStringMapMutex.Unlock() + + delete(p.serviceStringMap, serviceStr) +} + +func (p *proxier) Run(stopCh <-chan struct{}) { p.once.Do(func() { go p.serviceConfig.Run(stopCh) go p.endpointsConfig.Run(stopCh) @@ -267,12 +302,12 @@ func (p *Proxier) Run(stopCh <-chan struct{}) { }) } -func New(hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) *Proxier { +func New(hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) *proxier { recorder := record.NewBroadcaster().NewRecorder( runtime.NewScheme(), corev1.EventSource{Component: componentName, Host: hostname}, ) - p := &Proxier{ + p := &proxier{ endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod), serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), endpointsChanges: newEndpointsChangesTracker(hostname), @@ -281,6 +316,7 @@ func New(hostname string, informerFactory informers.SharedInformerFactory, ofCli serviceInstalledMap: k8sproxy.ServiceMap{}, endpointInstalledMap: map[k8sproxy.ServicePortName]map[string]struct{}{}, endpointsMap: types.EndpointsMap{}, + serviceStringMap: map[string]k8sproxy.ServicePortName{}, groupCounter: types.NewGroupCounter(), ofClient: ofClient, } diff --git a/pkg/agent/proxy/proxier_linux.go b/pkg/agent/proxy/proxier_linux.go index 057f9135d85..729fd4353de 100644 --- a/pkg/agent/proxy/proxier_linux.go +++ b/pkg/agent/proxy/proxier_linux.go @@ -26,7 +26,7 @@ import ( // installLoadBalancerServiceFlows install OpenFlow entries for LoadBalancer Service. // The rules for traffic from local Pod to LoadBalancer Service are same with rules for Cluster Service. // For the LoadBalancer Service traffic from outside, kube-proxy will handle it. -func (p *Proxier) installLoadBalancerServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { +func (p *proxier) installLoadBalancerServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { if err := p.ofClient.InstallServiceFlows(groupID, svcIP, svcPort, protocol, affinityTimeout); err != nil { klog.Errorf("Error when installing LoadBalancer Service flows: %v", err) return err diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 7f5787e01af..ee40c25afcb 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -37,7 +37,7 @@ func makeNamespaceName(namespace, name string) apimachinerytypes.NamespacedName return apimachinerytypes.NamespacedName{Namespace: namespace, Name: name} } -func makeServiceMap(proxier *Proxier, allServices ...*corev1.Service) { +func makeServiceMap(proxier *proxier, allServices ...*corev1.Service) { for i := range allServices { proxier.serviceChanges.OnServiceUpdate(nil, allServices[i]) } @@ -58,7 +58,7 @@ func makeTestService(namespace, name string, svcFunc func(*corev1.Service)) *cor return svc } -func makeEndpointsMap(proxier *Proxier, allEndpoints ...*corev1.Endpoints) { +func makeEndpointsMap(proxier *proxier, allEndpoints ...*corev1.Endpoints) { for i := range allEndpoints { proxier.endpointsChanges.OnEndpointUpdate(nil, allEndpoints[i]) } @@ -76,14 +76,14 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*corev1.Endpoints)) return ept } -func NewFakeProxier(ofClient openflow.Client) *Proxier { +func NewFakeProxier(ofClient openflow.Client) *proxier { hostname := "localhost" eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder( runtime.NewScheme(), corev1.EventSource{Component: componentName, Host: hostname}, ) - p := &Proxier{ + p := &proxier{ endpointsChanges: newEndpointsChangesTracker(hostname), serviceChanges: newServiceChangesTracker(recorder), serviceMap: k8sproxy.ServiceMap{}, @@ -92,6 +92,7 @@ func NewFakeProxier(ofClient openflow.Client) *Proxier { endpointsMap: types.EndpointsMap{}, groupCounter: types.NewGroupCounter(), ofClient: ofClient, + serviceStringMap: map[string]k8sproxy.ServicePortName{}, } return p } diff --git a/pkg/agent/proxy/proxier_windows.go b/pkg/agent/proxy/proxier_windows.go index 4c4414aa28a..9549e66443a 100644 --- a/pkg/agent/proxy/proxier_windows.go +++ b/pkg/agent/proxy/proxier_windows.go @@ -27,7 +27,7 @@ import ( // The rules for traffic from local Pod to LoadBalancer Service are the same with rules for Cluster Service. // For the LoadBalancer Service traffic from outside, specific rules are install to forward the packets // to the host network to let kube-proxy handle the traffic. -func (p *Proxier) installLoadBalancerServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { +func (p *proxier) installLoadBalancerServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { if err := p.ofClient.InstallServiceFlows(groupID, svcIP, svcPort, protocol, affinityTimeout); err != nil { klog.Errorf("Error when installing LoadBalancer Service flows: %v", err) return err diff --git a/pkg/agent/proxy/testing/mock_proxy.go b/pkg/agent/proxy/testing/mock_proxy.go new file mode 100644 index 00000000000..6c4a347e2be --- /dev/null +++ b/pkg/agent/proxy/testing/mock_proxy.go @@ -0,0 +1,76 @@ +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/vmware-tanzu/antrea/pkg/agent/proxy (interfaces: Proxier) + +// Package testing is a generated GoMock package. +package testing + +import ( + gomock "github.com/golang/mock/gomock" + proxy "github.com/vmware-tanzu/antrea/third_party/proxy" + reflect "reflect" +) + +// MockProxier is a mock of Proxier interface +type MockProxier struct { + ctrl *gomock.Controller + recorder *MockProxierMockRecorder +} + +// MockProxierMockRecorder is the mock recorder for MockProxier +type MockProxierMockRecorder struct { + mock *MockProxier +} + +// NewMockProxier creates a new mock instance +func NewMockProxier(ctrl *gomock.Controller) *MockProxier { + mock := &MockProxier{ctrl: ctrl} + mock.recorder = &MockProxierMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockProxier) EXPECT() *MockProxierMockRecorder { + return m.recorder +} + +// GetServiceByIP mocks base method +func (m *MockProxier) GetServiceByIP(arg0 string) (proxy.ServicePortName, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetServiceByIP", arg0) + ret0, _ := ret[0].(proxy.ServicePortName) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetServiceByIP indicates an expected call of GetServiceByIP +func (mr *MockProxierMockRecorder) GetServiceByIP(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceByIP", reflect.TypeOf((*MockProxier)(nil).GetServiceByIP), arg0) +} + +// Run mocks base method +func (m *MockProxier) Run(arg0 <-chan struct{}) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Run", arg0) +} + +// Run indicates an expected call of Run +func (mr *MockProxierMockRecorder) Run(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockProxier)(nil).Run), arg0) +} diff --git a/pkg/util/ip/ip.go b/pkg/util/ip/ip.go index e8e658d5be0..d573cd7f52d 100644 --- a/pkg/util/ip/ip.go +++ b/pkg/util/ip/ip.go @@ -20,6 +20,8 @@ import ( "net" "sort" + corev1 "k8s.io/api/core/v1" + "github.com/vmware-tanzu/antrea/pkg/apis/networking/v1beta1" ) @@ -28,6 +30,12 @@ const ( v6BitLen = 8 * net.IPv6len ) +var serviceProtocolMap = map[uint8]corev1.Protocol{ + 6: corev1.ProtocolTCP, + 17: corev1.ProtocolUDP, + 132: corev1.ProtocolSCTP, +} + // This function takes in one allow CIDR and multiple except CIDRs and gives diff CIDRs // in allowCIDR eliminating except CIDRs. It currently supports only IPv4. except CIDR input // can be changed. @@ -146,3 +154,12 @@ func NetIPNetToIPNet(ipNet *net.IPNet) *v1beta1.IPNet { prefix, _ := ipNet.Mask.Size() return &v1beta1.IPNet{IP: v1beta1.IPAddress(ipNet.IP), PrefixLength: int32(prefix)} } + +// LookupServiceProtocol return service protocol string given protocol identifier +func LookupServiceProtocol(protoID uint8) (corev1.Protocol, error) { + serviceProto, found := serviceProtocolMap[protoID] + if !found { + return "", fmt.Errorf("unknown protocol identifier: %d", protoID) + } + return serviceProto, nil +}