diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 3f6f8178be1..3735ce0ad59 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -96,7 +96,7 @@ type Client interface { InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error // UninstallEndpointFlows removes flows of the Endpoint installed by // InstallEndpointFlows. - UninstallEndpointFlows(protocol binding.Protocol, endpoint proxy.Endpoint) error + UninstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error // InstallServiceFlows installs flows for accessing Service NodePort, LoadBalancer and ClusterIP. It installs the // flow that uses the group/bucket to do service LB. If the affinityTimeout is not zero, it also installs the flow @@ -452,21 +452,36 @@ func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flow // deleteFlows deletes all the flows in the flow cache indexed by the provided flowCacheKey. func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) error { - fCacheI, ok := cache.Load(flowCacheKey) - if !ok { - // no matching flows found in the cache - return nil + return c.deleteFlowsWithMultipleKeys(cache, []string{flowCacheKey}) +} + +// deleteFlowsWithMultipleKeys uninstalls the flows with different flowCache keys and remove them from the cache on success. +// It will skip the keys which are not in the cache. All flows will be uninstalled via a bundle. +func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []string) error { + // allFlows keeps the flows we will delete via a bundle. + var allFlows []binding.Flow + for _, key := range keys { + flows, ok := cache.Load(key) + // If a flow cache entry of the key does not exist, skip it. + if !ok { + klog.V(2).InfoS("Cached flow with provided key was found", "key", key) + continue + } + for _, flow := range flows.(flowCache) { + allFlows = append(allFlows, flow) + } } - fCache := fCacheI.(flowCache) - // Delete flows from OVS. - delFlows := make([]binding.Flow, 0, len(fCache)) - for _, flow := range fCache { - delFlows = append(delFlows, flow) + if len(allFlows) == 0 { + return nil } - if err := c.ofEntryOperations.DeleteAll(delFlows); err != nil { + err := c.ofEntryOperations.DeleteAll(allFlows) + if err != nil { return err } - cache.Delete(flowCacheKey) + // Delete the keys and corresponding flows from the flow cache. + for _, key := range keys { + cache.Delete(key) + } return nil } @@ -681,16 +696,22 @@ func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []pro return c.addFlowsWithMultipleKeys(c.featureService.cachedFlows, keyToFlows) } -func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoint proxy.Endpoint) error { +func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - port, err := endpoint.Port() - if err != nil { - return fmt.Errorf("error when getting port: %w", err) + // keyToFlows is a map from the flows' cache key to the flows. + flowCacheKeys := make([]string, 0, len(endpoints)) + + for _, endpoint := range endpoints { + port, err := endpoint.Port() + if err != nil { + return fmt.Errorf("error when getting port: %w", err) + } + flowCacheKeys = append(flowCacheKeys, generateEndpointFlowCacheKey(endpoint.IP(), port, protocol)) } - cacheKey := generateEndpointFlowCacheKey(endpoint.IP(), port, protocol) - return c.deleteFlows(c.featureService.cachedFlows, cacheKey) + + return c.deleteFlowsWithMultipleKeys(c.featureService.cachedFlows, flowCacheKeys) } func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType) error { diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index d0ca9c89199..b3875db8877 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -1035,7 +1035,7 @@ func Test_client_InstallEndpointFlows(t *testing.T) { defer resetPipelines() m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) - m.EXPECT().DeleteAll(gomock.Any()).Return(nil).Times(len(tc.endpoints)) + m.EXPECT().DeleteAll(gomock.Any()).Return(nil).Times(1) assert.NoError(t, fc.InstallEndpointFlows(tc.protocol, tc.endpoints)) var flows []string @@ -1048,8 +1048,8 @@ func Test_client_InstallEndpointFlows(t *testing.T) { } assert.ElementsMatch(t, tc.expectedFlows, flows) + assert.NoError(t, fc.UninstallEndpointFlows(tc.protocol, tc.endpoints)) for _, ep := range tc.endpoints { - assert.NoError(t, fc.UninstallEndpointFlows(tc.protocol, ep)) endpointPort, _ := ep.Port() cacheKey := generateEndpointFlowCacheKey(ep.IP(), endpointPort, tc.protocol) _, ok := fc.featureService.cachedFlows.Load(cacheKey) diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index fbf9b8d468f..5cd67aaf0ef 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -783,7 +783,7 @@ func (mr *MockClientMockRecorder) SubscribePacketIn(arg0, arg1 interface{}) *gom } // UninstallEndpointFlows mocks base method -func (m *MockClient) UninstallEndpointFlows(arg0 openflow.Protocol, arg1 proxy.Endpoint) error { +func (m *MockClient) UninstallEndpointFlows(arg0 openflow.Protocol, arg1 []proxy.Endpoint) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UninstallEndpointFlows", arg0, arg1) ret0, _ := ret[0].(error) diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 2a106f8d8e9..4abb61a1dd5 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -141,65 +141,65 @@ func (p *proxier) isInitialized() bool { return p.endpointsChanges.Synced() && p.serviceChanges.Synced() } -// removeStaleServices removes all expired Services. Once a Service is deleted, all -// its Endpoints will be expired, and the removeStaleEndpoints method takes -// responsibility for cleaning up, thus we don't need to call removeEndpoint in this -// function. +// removeStaleServices removes all the configurations of expired Services and their associated Endpoints. func (p *proxier) removeStaleServices() { for svcPortName, svcPort := range p.serviceInstalledMap { if _, ok := p.serviceMap[svcPortName]; ok { continue } svcInfo := svcPort.(*types.ServiceInfo) - klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String()) + svcInfoStr := svcInfo.String() + klog.V(2).InfoS("Removing stale Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } - - if p.proxyAll { - // Remove NodePort flows and configurations. - if svcInfo.NodePort() > 0 { - if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) - continue - } + // Remove associated Endpoints flows. + if endpoints, ok := p.endpointsInstalledMap[svcPortName]; ok { + if err := p.removeStaleEndpoints(endpoints, svcInfo.Protocol()); err != nil { + klog.ErrorS(err, "Error when removing Endpoints flows for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) + continue + } + } + // Remove NodePort flows and configurations. + if p.proxyAll && svcInfo.NodePort() > 0 { + if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { + klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) + continue } } // Remove LoadBalancer flows and configurations. if p.proxyLoadBalancerIPs && len(svcInfo.LoadBalancerIPStrings()) > 0 { if err := p.uninstallLoadBalancerService(svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } } - // Remove Service group whose Endpoints are local. - if svcInfo.ExternalPolicyLocal() { - if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist { - if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil { - klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName) - continue - } - p.groupCounter.Recycle(svcPortName, true) + // Remove Service group which has only local Endpoints. + if groupID, exist := p.groupCounter.Get(svcPortName, true); exist { + if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { + klog.ErrorS(err, "Error when uninstalling group of local Endpoints for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) + continue } + p.groupCounter.Recycle(svcPortName, true) } // Remove Service group which has all Endpoints. if groupID, exist := p.groupCounter.Get(svcPortName, false); exist { if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { - klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling group of all Endpoints for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } p.groupCounter.Recycle(svcPortName, false) } delete(p.serviceInstalledMap, svcPortName) - p.deleteServiceByIP(svcInfo.String()) + p.deleteServiceByIP(svcInfoStr) } } -func getBindingProtoForIPProto(endpointIP string, protocol corev1.Protocol) binding.Protocol { +func getBindingProtoForIPProto(isIPv6 bool, protocol corev1.Protocol) binding.Protocol { var bindingProtocol binding.Protocol - if utilnet.IsIPv6String(endpointIP) { + if isIPv6 { bindingProtocol = binding.ProtocolTCPv6 if protocol == corev1.ProtocolUDP { bindingProtocol = binding.ProtocolUDPv6 @@ -217,47 +217,31 @@ func getBindingProtoForIPProto(endpointIP string, protocol corev1.Protocol) bind return bindingProtocol } -// removeEndpoint removes flows for the given Endpoint from the data path if these flows are no longer +// removeStaleEndpoints removes flows for the given Endpoints from the data path if these flows are no longer // needed by any Service. Endpoints from different Services can have the same characteristics and thus -// can share the same flows. removeEndpoint must be called whenever an Endpoint is no longer used by a -// given Service. If the Endpoint is still referenced by any other Services, no flow will be removed. +// can share the same flows. removeStaleEndpoints must be called whenever Endpoints are no longer used by a +// given Service. If the Endpoints are still referenced by any other Services, no flow will be removed. // The method only returns an error if a data path operation fails. If the flows are successfully -// removed from the data path, the method returns true. Otherwise, if the flows are still needed for -// other Services, it returns false. -func (p *proxier) removeEndpoint(endpoint k8sproxy.Endpoint, protocol binding.Protocol) (bool, error) { - key := endpointKey(endpoint, protocol) - count := p.endpointReferenceCounter[key] - if count == 1 { - if err := p.ofClient.UninstallEndpointFlows(protocol, endpoint); err != nil { - return false, err +// removed from the data path, the method returns nil. +func (p *proxier) removeStaleEndpoints(staleEndpoints map[string]k8sproxy.Endpoint, ipProtocol corev1.Protocol) error { + var endpointsToRemove []k8sproxy.Endpoint + bindingProtocol := getBindingProtoForIPProto(p.isIPv6, ipProtocol) + + for _, endpoint := range staleEndpoints { + key := endpointKey(endpoint, bindingProtocol) + count := p.endpointReferenceCounter[key] + if count == 1 { + endpointsToRemove = append(endpointsToRemove, endpoint) + klog.V(2).InfoS("Endpoint will be removed", "Endpoint", endpoint.String(), "Protocol", bindingProtocol) + } else { + p.endpointReferenceCounter[key] = count - 1 + klog.V(2).InfoS("Stale Endpoint is still referenced by other Services, decrementing reference count by 1", "Endpoint", endpoint.String(), "Protocol", bindingProtocol) } - delete(p.endpointReferenceCounter, key) - klog.V(2).Infof("Endpoint %s/%s removed", endpoint.String(), protocol) - } else if count > 1 { - p.endpointReferenceCounter[key] = count - 1 - klog.V(2).Infof("Stale Endpoint %s/%s is still referenced by other Services, decrementing reference count by 1", endpoint.String(), protocol) - return false, nil } - return true, nil -} - -// removeStaleEndpoints compares Endpoints we installed with Endpoints we expected. All installed but unexpected Endpoints -// will be deleted by using removeEndpoint. -func (p *proxier) removeStaleEndpoints() { - for svcPortName, installedEps := range p.endpointsInstalledMap { - for installedEpName, installedEp := range installedEps { - if _, ok := p.endpointsMap[svcPortName][installedEpName]; !ok { - if _, err := p.removeEndpoint(installedEp, getBindingProtoForIPProto(installedEp.IP(), svcPortName.Protocol)); err != nil { - klog.Errorf("Error when removing Endpoint %v for %v", installedEp, svcPortName) - continue - } - delete(installedEps, installedEpName) - } - } - if len(installedEps) == 0 { - delete(p.endpointsInstalledMap, svcPortName) - } + if len(endpointsToRemove) != 0 { + return p.ofClient.UninstallEndpointFlows(bindingProtocol, endpointsToRemove) } + return nil } func serviceIdentityChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool { @@ -295,10 +279,10 @@ func (p *proxier) installNodePortService(groupID binding.GroupIDType, svcPort ui svcIP = agentconfig.VirtualNodePortDNATIPv6 } if err := p.ofClient.InstallServiceFlows(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeNodePort); err != nil { - return fmt.Errorf("failed to install Service NodePort load balancing flows: %w", err) + return fmt.Errorf("failed to install NodePort load balancing flows: %w", err) } if err := p.routeClient.AddNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { - return fmt.Errorf("failed to install Service NodePort traffic redirecting flows: %w", err) + return fmt.Errorf("failed to install NodePort traffic redirecting rules: %w", err) } return nil } @@ -309,10 +293,10 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot svcIP = agentconfig.VirtualNodePortDNATIPv6 } if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove Service NodePort NodePort load balancing flows: %w", err) + return fmt.Errorf("failed to remove NodePort load balancing flows: %w", err) } if err := p.routeClient.DeleteNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove Service NodePort traffic redirecting flows: %w", err) + return fmt.Errorf("failed to remove NodePort traffic redirecting rules: %w", err) } return nil } @@ -321,13 +305,13 @@ func (p *proxier) installLoadBalancerService(groupID binding.GroupIDType, loadBa for _, ingress := range loadBalancerIPStrings { if ingress != "" { if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer); err != nil { - return fmt.Errorf("failed to install Service LoadBalancer load balancing flows: %w", err) + return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err) } } } if p.proxyAll { if err := p.routeClient.AddLoadBalancer(loadBalancerIPStrings); err != nil { - return fmt.Errorf("failed to install Service LoadBalancer traffic redirecting flows: %w", err) + return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) } } @@ -338,13 +322,13 @@ func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, s for _, ingress := range loadBalancerIPStrings { if ingress != "" { if err := p.ofClient.UninstallServiceFlows(net.ParseIP(ingress), svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove Service LoadBalancer load balancing flows: %w", err) + return fmt.Errorf("failed to remove LoadBalancer load balancing flows: %w", err) } } } if p.proxyAll { if err := p.routeClient.DeleteLoadBalancer(loadBalancerIPStrings); err != nil { - return fmt.Errorf("failed to remove Service LoadBalancer traffic redirecting flows: %w", err) + return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) } } @@ -359,6 +343,7 @@ func (p *proxier) installServices() { } for svcPortName, svcPort := range p.serviceMap { svcInfo := svcPort.(*types.ServiceInfo) + svcInfoStr := svcInfo.String() endpointsInstalled, ok := p.endpointsInstalledMap[svcPortName] if !ok { endpointsInstalled = map[string]k8sproxy.Endpoint{} @@ -420,6 +405,7 @@ func (p *proxier) installServices() { for _, endpoint := range allReachableEndpoints { if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. needUpdateEndpoints = true + klog.V(2).InfoS("At least one Endpoint of Service is not installed, updating Endpoints", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) break } } @@ -428,7 +414,7 @@ func (p *proxier) installServices() { } // If there are expired Endpoints, Endpoints installed should be updated. if len(allReachableEndpoints) < len(endpointsInstalled) { - klog.V(2).Infof("Some Endpoints of Service %s removed, updating Endpoints", svcInfo.String()) + klog.V(2).InfoS("Some Endpoints of Service was removed, updating Endpoints", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) needUpdateEndpoints = true } @@ -452,16 +438,16 @@ func (p *proxier) installServices() { } if pSvcInfo != nil { - klog.V(2).Infof("Updating Service %s %s", svcPortName.Name, svcInfo.String()) + klog.V(2).InfoS("Updating Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) } else { - klog.V(2).Infof("Installing Service %s %s", svcPortName.Name, svcInfo.String()) + klog.V(2).InfoS("Installing Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) } if needUpdateEndpoints { // Install Endpoints. err := p.ofClient.InstallEndpointFlows(svcInfo.OFProtocol, allReachableEndpoints) if err != nil { - klog.ErrorS(err, "Error when installing Endpoints flows") + klog.ErrorS(err, "Error when installing Endpoints flows for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } if internalPolicyLocal != externalPolicyLocal { @@ -473,19 +459,19 @@ func (p *proxier) installServices() { // The Multi-cluster Service supports ClusterIP type of Service only, so always set mcsLocalService to nil when // the type of the Service is not ClusterIP. if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, nil, localEndpoints); err != nil { - klog.ErrorS(err, "Error when installing Group of local Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when installing group of local Endpoints for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } groupID = p.groupCounter.AllocateIfNotExist(svcPortName, false) if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, nil, clusterEndpoints); err != nil { - klog.ErrorS(err, "Error when installing Group of all Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when installing group of all Endpoints for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } } else { // If the type of the Service is ClusterIP, install a group according to internalTrafficPolicy. groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalPolicyLocal) if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, mcsLocalService, allReachableEndpoints); err != nil { - klog.ErrorS(err, "Error when installing Group of Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when installing group of Endpoints for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } } @@ -501,33 +487,55 @@ func (p *proxier) installServices() { bothPolicyLocal := internalPolicyLocal groupID := p.groupCounter.AllocateIfNotExist(svcPortName, bothPolicyLocal) if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, mcsLocalService, allReachableEndpoints); err != nil { - klog.ErrorS(err, "Error when installing Group of local Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when installing group of Endpoints for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr, "BothTrafficPolicies", bothPolicyLocal) continue } if groupID, exist := p.groupCounter.Get(svcPortName, !bothPolicyLocal); exist { if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { - klog.ErrorS(err, "Failed to uninstall Group of all Endpoints for Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling group of Endpoints for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr, "BothTrafficPolicies", !bothPolicyLocal) continue } p.groupCounter.Recycle(svcPortName, !bothPolicyLocal) } } - updateInstalledEndpointsMap := func(endpoint k8sproxy.Endpoint) { - // If the Endpoint is newly installed, add a reference. - if _, ok := endpointsInstalled[endpoint.String()]; !ok { + // If mcsLocalService is not nil, it has a specific Endpoint which is not included in allReachableEndpoints. + // Before cleaning stale Endpoints, append it to allReachableEndpoints. + if mcsLocalService != nil { + allReachableEndpoints = append(allReachableEndpoints, mcsLocalService.Endpoint) + } + + // Map endpointsInstalled may have stale Endpoints, we need to calculate a new map updatedEndpointsInstalled + // to replace it. + updatedEndpointsInstalled := map[string]k8sproxy.Endpoint{} + for _, endpoint := range allReachableEndpoints { + // Add the Endpoint to map updatedEndpointsInstalled since Endpoints in allReachableEndpoints are actually + // installed Endpoints. + updatedEndpointsInstalled[endpoint.String()] = endpoint + + // If the Endpoint is not newly installed for the Service, remove it from map endpointsInstalled. If all + // the Endpoints that are not newly installed are removed from map endpointsInstalled, only stale Endpoints + // are left. + if _, exists := endpointsInstalled[endpoint.String()]; exists { + delete(endpointsInstalled, endpoint.String()) + } else { + // If the Endpoint is newly installed, add a reference. key := endpointKey(endpoint, svcInfo.OFProtocol) p.endpointReferenceCounter[key] = p.endpointReferenceCounter[key] + 1 - endpointsInstalled[endpoint.String()] = endpoint } } - for _, e := range allReachableEndpoints { - updateInstalledEndpointsMap(e) + + // Update the Endpoints installed of the Service in endpointsInstalledMap. + delete(p.endpointsInstalledMap, svcPortName) + if len(updatedEndpointsInstalled) != 0 { + p.endpointsInstalledMap[svcPortName] = updatedEndpointsInstalled } - // When mcsLocalService is not nil, add its ClusterIP info into the endpointsInstalled map - // since the corresponding Endpoint in Multi-cluster Service is not in the endpointUpdateList. - if mcsLocalService != nil { - updateInstalledEndpointsMap(mcsLocalService.Endpoint) + + // After removing all the Endpoints which are not newly installed, there are only stale Endpoints in endpointsInstalled, + // then remove the stale Endpoints. + if err = p.removeStaleEndpoints(endpointsInstalled, svcPortName.Protocol); err != nil { + klog.ErrorS(err, "Error when removing flows of stale Endpoints for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) + continue } } @@ -536,7 +544,7 @@ func (p *proxier) installServices() { if needRemoval { // If previous Service should be removed, remove ClusterIP flows of previous Service. if err := p.ofClient.UninstallServiceFlows(pSvcInfo.ClusterIP(), uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -544,14 +552,14 @@ func (p *proxier) installServices() { // If previous Service which has NodePort should be removed, remove NodePort flows and configurations of previous Service. if pSvcInfo.NodePort() > 0 { if err := p.uninstallNodePortService(uint16(pSvcInfo.NodePort()), pSvcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } } // If previous Service which has ClusterIP should be removed, remove ClusterIP routes. if svcInfo.ClusterIP() != nil { if err := p.routeClient.DeleteClusterIPRoute(pSvcInfo.ClusterIP()); err != nil { - klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling ClusterIP route for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } } @@ -559,35 +567,47 @@ func (p *proxier) installServices() { } // Install ClusterIP flows for the Service. - groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalPolicyLocal) + groupID, exists := p.groupCounter.Get(svcPortName, internalPolicyLocal) + if !exists { + klog.InfoS("Group for Service internalTrafficPolicy is not installed", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr, "internalTrafficPolicy", internalPolicyLocal) + continue + } if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), externalPolicyLocal, corev1.ServiceTypeClusterIP); err != nil { - klog.Errorf("Error when installing Service flows: %v", err) + klog.ErrorS(err, "Error when installing ClusterIP flows for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } if p.proxyAll { - nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalPolicyLocal) // Install ClusterIP route on Node so that ClusterIP can be accessed on Node. Every time a new ClusterIP // is created, the routing target IP block will be recalculated for expansion to be able to route the new // created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR // can be finally calculated after creating enough ClusterIPs. if err := p.routeClient.AddClusterIPRoute(svcInfo.ClusterIP()); err != nil { - klog.ErrorS(err, "Failed to install ClusterIP route of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when installing ClusterIP route for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } // If previous Service is nil or NodePort flows and configurations of previous Service have been removed, // install NodePort flows and configurations for current Service. if svcInfo.NodePort() > 0 && (pSvcInfo == nil || needRemoval) { - if err := p.installNodePortService(nGroupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { - klog.ErrorS(err, "Failed to install NodePort flows and configurations of Service", "Service", svcPortName) + groupID, exists = p.groupCounter.Get(svcPortName, externalPolicyLocal) + if !exists { + klog.ErrorS(fmt.Errorf("group is not found"), "Group for Service externalTrafficPolicy is not installed", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr, "externalTrafficPolicy", externalPolicyLocal) + continue + } + if err := p.installNodePortService(groupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { + klog.ErrorS(err, "Error when installing NodePort flows and configurations of Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } } } if p.proxyLoadBalancerIPs { - nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalPolicyLocal) + groupID, exists = p.groupCounter.Get(svcPortName, externalPolicyLocal) + if !exists { + klog.ErrorS(fmt.Errorf("group is not found"), "Group for Service externalTrafficPolicy is not installed", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr, svcPortName, "externalTrafficPolicy", externalPolicyLocal) + continue + } // Service LoadBalancer flows can be partially updated. var toDelete, toAdd []string if needRemoval { @@ -600,14 +620,14 @@ func (p *proxier) installServices() { // Remove LoadBalancer flows and configurations. if len(toDelete) > 0 { if err := p.uninstallLoadBalancerService(toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } } // Install LoadBalancer flows and configurations. if len(toAdd) > 0 { - if err := p.installLoadBalancerService(nGroupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { - klog.ErrorS(err, "Failed to install LoadBalancer flows and configurations of Service", "Service", svcPortName) + if err := p.installLoadBalancerService(groupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { + klog.ErrorS(err, "Error when installing LoadBalancer flows and configurations for Service", "Service", "ServiceName", svcPortName, "ServiceInfo", svcInfoStr) continue } } @@ -615,7 +635,7 @@ func (p *proxier) installServices() { } p.serviceInstalledMap[svcPortName] = svcPort - p.addServiceByIP(svcInfo.String(), svcPortName) + p.addServiceByIP(svcInfoStr, svcPortName) } } @@ -653,7 +673,6 @@ func (p *proxier) syncProxyRules() { p.removeStaleServices() p.installServices() - p.removeStaleEndpoints() if p.serviceHealthServer != nil { if err := p.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 1a242a13df2..1efb5ba8afc 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -1013,7 +1013,7 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedEp).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedAllEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) @@ -1071,7 +1071,7 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net. mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedEp).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedAllEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) @@ -1756,7 +1756,8 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, makeEndpointsMap(fp, eps) expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)} - expectedAllEps := append(expectedLocalEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil)) + expectedRemoteEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil)} + expectedAllEps := append(expectedLocalEps, expectedRemoteEps...) bindingProtocol := binding.ProtocolTCP if isIPv6 { @@ -1773,7 +1774,8 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) - mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedLocalEps)).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedLocalEps).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedRemoteEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, nil, expectedLocalEps).Times(1) fp.syncProxyRules() } diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index ebc6e4e4d90..0d3b2293e6a 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -129,6 +129,11 @@ type Client struct { // NewClient returns a route client. func NewClient(networkConfig *config.NetworkConfig, noSNAT, proxyAll, connectUplinkToBridge, multicastEnabled bool, serviceCIDRProvider servicecidr.Interface) (*Client, error) { + a := true + aa := &a + if !*aa { + fmt.Printf("x") + } return &Client{ networkConfig: networkConfig, noSNAT: noSNAT, @@ -140,6 +145,7 @@ func NewClient(networkConfig *config.NetworkConfig, noSNAT, proxyAll, connectUpl isCloudEKS: env.IsCloudEKS(), serviceCIDRProvider: serviceCIDRProvider, }, nil + } // Initialize initializes all infrastructures required to route container packets in host network. diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 3a9833b0987..dd541c090c7 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -795,10 +795,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint assert.Nil(t, err) err = c.UninstallServiceGroup(groupID) assert.Nil(t, err) - for _, ep := range endpointList { - err := c.UninstallEndpointFlows(svc.protocol, ep) - assert.Nil(t, err) - } + assert.NoError(t, c.UninstallEndpointFlows(svc.protocol, endpointList)) } func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16, antreaPolicyEnabled bool) (tableFlows []expectTableFlows, groupBuckets []string) {