diff --git a/pkg/proxy/winkernel/hns.go b/pkg/proxy/winkernel/hns.go index 9d3889d60ccb6..2045f616d4b15 100644 --- a/pkg/proxy/winkernel/hns.go +++ b/pkg/proxy/winkernel/hns.go @@ -242,6 +242,20 @@ func (hns hns) deleteEndpoint(hnsID string) error { return err } +// findLoadBalancerID will construct a id from the provided loadbalancer fields +func findLoadBalancerID(endpoints []endpointsInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) { + // Compute hash from backends (endpoint IDs) + hash, err := hashEndpoints(endpoints) + if err != nil { + klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints) + return loadBalancerIdentifier{}, err + } + if len(vip) > 0 { + return loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash}, nil + } + return loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash}, nil +} + func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) { lbs, err := hcn.ListLoadBalancers() var id loadBalancerIdentifier @@ -391,7 +405,7 @@ func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err for _, ep := range endpoints { switch x := any(ep).(type) { case endpointsInfo: - id = x.hnsID + id = strings.ToUpper(x.hnsID) case string: id = x } diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 887c9adece308..75500b138f30c 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -127,6 +127,7 @@ type serviceInfo struct { hns HostNetworkService preserveDIP bool localTrafficDSR bool + winProxyOptimization bool } type hnsNetworkInfo struct { @@ -143,7 +144,12 @@ type remoteSubnetInfo struct { drMacAddress string } -const NETWORK_TYPE_OVERLAY = "overlay" +const ( + NETWORK_TYPE_OVERLAY = "overlay" + // MAX_COUNT_STALE_LOADBALANCERS is the maximum number of stale loadbalancers which cleanedup in single syncproxyrules. + // If there are more stale loadbalancers to clean, it will go to next iteration of syncproxyrules. + MAX_COUNT_STALE_LOADBALANCERS = 20 +) func newHostNetworkService() (HostNetworkService, hcn.SupportedFeatures) { var h HostNetworkService @@ -156,6 +162,44 @@ func newHostNetworkService() (HostNetworkService, hcn.SupportedFeatures) { return h, supportedFeatures } +// logFormattedEndpoints will log all endpoints and its states which are taking part in endpointmap change. +// This mostly for debugging purpose and verbosity is set to 5. +func logFormattedEndpoints(logMsg string, logLevel klog.Level, svcPortName proxy.ServicePortName, eps []proxy.Endpoint) { + if klog.V(logLevel).Enabled() { + var epInfo string + for _, v := range eps { + epInfo = epInfo + fmt.Sprintf("\n %s={Ready:%v,Serving:%v,Terminating:%v,IsRemote:%v}", v.String(), v.IsReady(), v.IsServing(), v.IsTerminating(), !v.GetIsLocal()) + } + klog.V(logLevel).InfoS(logMsg, "svcPortName", svcPortName, "endpoints", epInfo) + } +} + +// This will cleanup stale load balancers which are pending delete +// in last iteration. This function will act like a self healing of stale +// loadbalancer entries. +func (proxier *Proxier) cleanupStaleLoadbalancers() { + i := 0 + countStaleLB := len(proxier.mapStaleLoadbalancers) + if countStaleLB == 0 { + return + } + klog.V(3).InfoS("Cleanup of stale loadbalancers triggered", "LB Count", countStaleLB) + for lbID := range proxier.mapStaleLoadbalancers { + i++ + if err := proxier.hns.deleteLoadBalancer(lbID); err == nil { + delete(proxier.mapStaleLoadbalancers, lbID) + } + if i == MAX_COUNT_STALE_LOADBALANCERS { + // The remaining stale loadbalancers will be cleaned up in next iteration + break + } + } + countStaleLB = len(proxier.mapStaleLoadbalancers) + if countStaleLB > 0 { + klog.V(3).InfoS("Stale loadbalancers still remaining", "LB Count", countStaleLB, "stale_lb_ids", proxier.mapStaleLoadbalancers) + } +} + func getNetworkName(hnsNetworkName string) (string, error) { if len(hnsNetworkName) == 0 { klog.V(3).InfoS("Flag --network-name not set, checking environment variable") @@ -312,16 +356,24 @@ func conjureMac(macPrefix string, ip net.IP) string { } func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) { - for svcPortName := range oldEndpointsMap { - proxier.onEndpointsMapChange(&svcPortName) + // This will optimize remote endpoint and loadbalancer deletion based on the annotation + var svcPortMap = make(map[proxy.ServicePortName]bool) + var logLevel klog.Level = 5 + for svcPortName, eps := range oldEndpointsMap { + logFormattedEndpoints("endpointsMapChange oldEndpointsMap", logLevel, svcPortName, eps) + svcPortMap[svcPortName] = true + proxier.onEndpointsMapChange(&svcPortName, false) } - for svcPortName := range newEndpointsMap { - proxier.onEndpointsMapChange(&svcPortName) + for svcPortName, eps := range newEndpointsMap { + logFormattedEndpoints("endpointsMapChange newEndpointsMap", logLevel, svcPortName, eps) + // redundantCleanup true means cleanup is called second time on the same svcPort + redundantCleanup := svcPortMap[svcPortName] + proxier.onEndpointsMapChange(&svcPortName, redundantCleanup) } } -func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName) { +func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName, redundantCleanup bool) { svc, exists := proxier.svcPortMap[*svcPortName] @@ -333,8 +385,15 @@ func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName) return } + if svcInfo.winProxyOptimization && redundantCleanup { + // This is a second cleanup call. + // Second cleanup on the same svcPort will be ignored if the + // winProxyOptimization is Enabled + return + } + klog.V(3).InfoS("Endpoints are modified. Service is stale", "servicePortName", svcPortName) - svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName]) + svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, true) } else { // If no service exists, just cleanup the remote endpoints klog.V(3).InfoS("Endpoints are orphaned, cleaning up") @@ -381,7 +440,7 @@ func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) { } klog.V(3).InfoS("Updating existing service port", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP(), "port", svcInfo.Port(), "protocol", svcInfo.Protocol()) - svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName]) + svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, false) } } @@ -426,6 +485,13 @@ func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, return ep, err } +func (ep *endpointsInfo) DecrementRefCount() { + klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointsInfo", ep) + if !ep.GetIsLocal() && ep.refCount != nil && *ep.refCount > 0 { + *ep.refCount-- + } +} + func (ep *endpointsInfo) Cleanup() { klog.V(3).InfoS("Endpoint cleanup", "endpointsInfo", ep) if !ep.GetIsLocal() && ep.refCount != nil { @@ -461,6 +527,8 @@ func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort { info := &serviceInfo{BaseServicePortInfo: bsvcPortInfo} preserveDIP := service.Annotations["preserve-destination"] == "true" + // Annotation introduced to enable optimized loadbalancing + winProxyOptimization := !(strings.ToUpper(service.Annotations["winProxyOptimization"]) == "DISABLED") localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal err := hcn.DSRSupported() if err != nil { @@ -478,6 +546,9 @@ func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service info.targetPort = targetPort info.hns = proxier.hns info.localTrafficDSR = localTrafficDSR + info.winProxyOptimization = winProxyOptimization + + klog.V(3).InfoS("Flags enabled for service", "service", service.Name, "localTrafficDSR", localTrafficDSR, "preserveDIP", preserveDIP, "winProxyOptimization", winProxyOptimization) for _, eip := range service.Spec.ExternalIPs { info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip}) @@ -561,6 +632,7 @@ type Proxier struct { forwardHealthCheckVip bool rootHnsEndpointName string + mapStaleLoadbalancers map[string]bool // This maintains entries of stale load balancers which are pending delete in last iteration } type localPort struct { @@ -719,6 +791,7 @@ func NewProxier( healthzPort: healthzPort, rootHnsEndpointName: config.RootHnsEndpointName, forwardHealthCheckVip: config.ForwardHealthCheckVip, + mapStaleLoadbalancers: make(map[string]bool), } ipFamily := v1.IPv4Protocol @@ -780,15 +853,25 @@ func CleanupLeftovers() (encounteredError bool) { return encounteredError } -func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) { +func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint, mapStaleLoadbalancers map[string]bool, isEndpointChange bool) { klog.V(3).InfoS("Service cleanup", "serviceInfo", svcInfo) - // Skip the svcInfo.policyApplied check to remove all the policies - svcInfo.deleteLoadBalancerPolicy() + // if it's an endpoint change and winProxyOptimization annotation enable, skip lb deletion and remoteEndpoint deletion + winProxyOptimization := isEndpointChange && svcInfo.winProxyOptimization + if winProxyOptimization { + klog.V(3).InfoS("Skipped loadbalancer deletion.", "hnsID", svcInfo.hnsID, "nodePorthnsID", svcInfo.nodePorthnsID, "winProxyOptimization", svcInfo.winProxyOptimization, "isEndpointChange", isEndpointChange) + } else { + // Skip the svcInfo.policyApplied check to remove all the policies + svcInfo.deleteLoadBalancerPolicy(mapStaleLoadbalancers) + } // Cleanup Endpoints references for _, ep := range endpoints { epInfo, ok := ep.(*endpointsInfo) if ok { - epInfo.Cleanup() + if winProxyOptimization { + epInfo.DecrementRefCount() + } else { + epInfo.Cleanup() + } } } if svcInfo.remoteEndpoint != nil { @@ -798,10 +881,11 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) { svcInfo.policyApplied = false } -func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { +func (svcInfo *serviceInfo) deleteLoadBalancerPolicy(mapStaleLoadbalancer map[string]bool) { // Remove the Hns Policy corresponding to this service hns := svcInfo.hns if err := hns.deleteLoadBalancer(svcInfo.hnsID); err != nil { + mapStaleLoadbalancer[svcInfo.hnsID] = true klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource.", "hnsID", svcInfo.hnsID, "ClusterIP", svcInfo.ClusterIP()) } else { // On successful delete, remove hnsId @@ -809,6 +893,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { } if err := hns.deleteLoadBalancer(svcInfo.nodePorthnsID); err != nil { + mapStaleLoadbalancer[svcInfo.nodePorthnsID] = true klog.V(1).ErrorS(err, "Error deleting Hns NodePort policy resource.", "hnsID", svcInfo.nodePorthnsID, "NodePort", svcInfo.NodePort()) } else { // On successful delete, remove hnsId @@ -816,6 +901,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { } for _, externalIP := range svcInfo.externalIPs { + mapStaleLoadbalancer[externalIP.hnsID] = true if err := hns.deleteLoadBalancer(externalIP.hnsID); err != nil { klog.V(1).ErrorS(err, "Error deleting Hns ExternalIP policy resource.", "hnsID", externalIP.hnsID, "IP", externalIP.ip) } else { @@ -824,7 +910,9 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { } } for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs { + klog.V(3).InfoS("Loadbalancer Hns LoadBalancer delete triggered for loadBalancer Ingress resources in cleanup", "lbIngressIP", lbIngressIP) if err := hns.deleteLoadBalancer(lbIngressIP.hnsID); err != nil { + mapStaleLoadbalancer[lbIngressIP.hnsID] = true klog.V(1).ErrorS(err, "Error deleting Hns IngressIP policy resource.", "hnsID", lbIngressIP.hnsID, "IP", lbIngressIP.ip) } else { // On successful delete, remove hnsId @@ -833,6 +921,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { if lbIngressIP.healthCheckHnsID != "" { if err := hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID); err != nil { + mapStaleLoadbalancer[lbIngressIP.healthCheckHnsID] = true klog.V(1).ErrorS(err, "Error deleting Hns IngressIP HealthCheck policy resource.", "hnsID", lbIngressIP.healthCheckHnsID, "IP", lbIngressIP.ip) } else { // On successful delete, remove hnsId @@ -992,7 +1081,7 @@ func (proxier *Proxier) cleanupAllPolicies() { klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName) continue } - svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName]) + svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName], proxier.mapStaleLoadbalancers, false) } } @@ -1009,6 +1098,56 @@ func isNetworkNotFoundError(err error) bool { return false } +// isAllEndpointsTerminating function will return true if all the endpoints are terminating. +// If atleast one is not terminating, then return false +func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool { + for _, epInfo := range proxier.endpointsMap[svcName] { + ep, ok := epInfo.(*endpointsInfo) + if !ok { + continue + } + if isLocalTrafficDSR && !ep.GetIsLocal() { + // KEP-1669: Ignore remote endpoints when the ExternalTrafficPolicy is Local (DSR Mode) + continue + } + // If Readiness Probe fails and pod is not under delete, then + // the state of the endpoint will be - Ready:False, Serving:False, Terminating:False + if !ep.IsReady() && !ep.IsTerminating() { + // Ready:false, Terminating:False, ignore + continue + } + if !ep.IsTerminating() { + return false + } + } + return true +} + +// isAllEndpointsNonServing function will return true if all the endpoints are non serving. +// If atleast one is serving, then return false +func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool { + for _, epInfo := range proxier.endpointsMap[svcName] { + ep, ok := epInfo.(*endpointsInfo) + if !ok { + continue + } + if isLocalTrafficDSR && !ep.GetIsLocal() { + continue + } + if ep.IsServing() { + return false + } + } + return true +} + +// updateQueriedEndpoints updates the queriedEndpoints map with newly created endpoint details +func updateQueriedEndpoints(newHnsEndpoint *endpointsInfo, queriedEndpoints map[string]*endpointsInfo) { + // store newly created endpoints in queriedEndpoints + queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint + queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint +} + // This is where all of the hns save/restore calls happen. // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules() { @@ -1125,9 +1264,7 @@ func (proxier *Proxier) syncProxyRules() { newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID) *newHnsEndpoint.refCount++ svcInfo.remoteEndpoint = newHnsEndpoint - // store newly created endpoints in queriedEndpoints - queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint - queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint + updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints) } } @@ -1137,6 +1274,19 @@ func (proxier *Proxier) syncProxyRules() { // Create Remote endpoints for every endpoint, corresponding to the service containsPublicIP := false containsNodeIP := false + var allEndpointsTerminating, allEndpointsNonServing bool + someEndpointsServing := true + + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ProxyTerminatingEndpoints) && len(svcInfo.loadBalancerIngressIPs) > 0 { + // Check should be done only if comes under the feature gate or enabled + // The check should be done only if Spec.Type == Loadbalancer. + allEndpointsTerminating = proxier.isAllEndpointsTerminating(svcName, svcInfo.localTrafficDSR) + allEndpointsNonServing = proxier.isAllEndpointsNonServing(svcName, svcInfo.localTrafficDSR) + someEndpointsServing = !allEndpointsNonServing + klog.V(4).InfoS("Terminating status checked for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "allEndpointsTerminating", allEndpointsTerminating, "allEndpointsNonServing", allEndpointsNonServing, "localTrafficDSR", svcInfo.localTrafficDSR) + } else { + klog.V(4).InfoS("Skipped terminating status check for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "proxyEndpointsFeatureGateEnabled", utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ProxyTerminatingEndpoints), "ingressLBCount", len(svcInfo.loadBalancerIngressIPs)) + } for _, epInfo := range proxier.endpointsMap[svcName] { ep, ok := epInfo.(*endpointsInfo) @@ -1145,9 +1295,19 @@ func (proxier *Proxier) syncProxyRules() { continue } - if !ep.IsReady() { - continue + if someEndpointsServing { + + if !allEndpointsTerminating && !ep.IsReady() { + klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is either not ready or all not all endpoints are terminating", "EpIP", ep.ip, " EpPort", ep.port, "allEndpointsTerminating", allEndpointsTerminating, "IsEpReady", ep.IsReady()) + continue + } + if !ep.IsServing() { + klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is not serving", "EpIP", ep.ip, " EpPort", ep.port, "IsEpServing", ep.IsServing()) + continue + } + } + var newHnsEndpoint *endpointsInfo hnsNetworkName := proxier.network.name var err error @@ -1205,6 +1365,7 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Remote endpoint creation failed", "endpointsInfo", hnsEndpoint) continue } + updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints) } else { hnsEndpoint := &endpointsInfo{ @@ -1218,6 +1379,7 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Remote endpoint creation failed") continue } + updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints) } } // For Overlay networks 'SourceVIP' on an Load balancer Policy can either be chosen as @@ -1266,7 +1428,14 @@ func (proxier *Proxier) syncProxyRules() { klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID) } + // In ETP:Cluster, if all endpoints are under termination, + // it will have serving and terminating, else only ready and serving if len(hnsEndpoints) == 0 { + if svcInfo.winProxyOptimization { + // Deleting loadbalancers when there are no endpoints to serve. + klog.V(3).InfoS("Cleanup existing ", "endpointsInfo", hnsEndpoints, "serviceName", svcName) + svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers) + } klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName) continue } @@ -1283,23 +1452,33 @@ func (proxier *Proxier) syncProxyRules() { klog.InfoS("Session Affinity is not supported on this version of Windows") } - hnsLoadBalancer, err := hns.getLoadBalancer( - hnsEndpoints, - loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP}, - sourceVip, - svcInfo.ClusterIP().String(), - Enum(svcInfo.Protocol()), - uint16(svcInfo.targetPort), - uint16(svcInfo.Port()), - queriedLoadBalancers, - ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") - continue - } + endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers) + + if endpointsAvailableForLB { + // If all endpoints are terminating, then no need to create Cluster IP LoadBalancer + // Cluster IP LoadBalancer creation + hnsLoadBalancer, err := hns.getLoadBalancer( + hnsEndpoints, + loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP}, + sourceVip, + svcInfo.ClusterIP().String(), + Enum(svcInfo.Protocol()), + uint16(svcInfo.targetPort), + uint16(svcInfo.Port()), + queriedLoadBalancers, + ) + if err != nil { + klog.ErrorS(err, "Policy creation failed") + continue + } + + svcInfo.hnsID = hnsLoadBalancer.hnsID + klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID) - svcInfo.hnsID = hnsLoadBalancer.hnsID - klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID) + } else { + klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) + } // If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints if svcInfo.NodePort() > 0 { @@ -1310,7 +1489,10 @@ func (proxier *Proxier) syncProxyRules() { nodePortEndpoints = hnsLocalEndpoints } - if len(nodePortEndpoints) > 0 { + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers) + + if len(nodePortEndpoints) > 0 && endpointsAvailableForLB { + // If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer hnsLoadBalancer, err := hns.getLoadBalancer( nodePortEndpoints, loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, @@ -1329,7 +1511,7 @@ func (proxier *Proxier) syncProxyRules() { svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID) } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID) + klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) } } @@ -1341,7 +1523,10 @@ func (proxier *Proxier) syncProxyRules() { externalIPEndpoints = hnsLocalEndpoints } - if len(externalIPEndpoints) > 0 { + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers) + + if len(externalIPEndpoints) > 0 && endpointsAvailableForLB { + // If all endpoints are in terminating stage, then no need to External IP LoadBalancer // Try loading existing policies, if already available hnsLoadBalancer, err = hns.getLoadBalancer( externalIPEndpoints, @@ -1360,7 +1545,7 @@ func (proxier *Proxier) syncProxyRules() { externalIP.hnsID = hnsLoadBalancer.hnsID klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID) } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID) + klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating) } } // Create a Load Balancer Policy for each loadbalancer ingress @@ -1371,6 +1556,8 @@ func (proxier *Proxier) syncProxyRules() { lbIngressEndpoints = hnsLocalEndpoints } + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers) + if len(lbIngressEndpoints) > 0 { hnsLoadBalancer, err := hns.getLoadBalancer( lbIngressEndpoints, @@ -1392,11 +1579,15 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP) } - if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil { + if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB { + // Avoid creating health check loadbalancer if all the endpoints are terminating nodeport := proxier.healthzPort if svcInfo.HealthCheckNodePort() != 0 { nodeport = svcInfo.HealthCheckNodePort() } + + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointsInfo{*gatewayHnsendpoint}, queriedLoadBalancers) + hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer( []endpointsInfo{*gatewayHnsendpoint}, loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, @@ -1413,6 +1604,8 @@ func (proxier *Proxier) syncProxyRules() { } lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP) + } else { + klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating) } } svcInfo.policyApplied = true @@ -1444,7 +1637,51 @@ func (proxier *Proxier) syncProxyRules() { // remove stale endpoint refcount entries for hnsID, referenceCount := range proxier.endPointsRefCount { if *referenceCount <= 0 { + klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID) + proxier.hns.deleteEndpoint(hnsID) delete(proxier.endPointsRefCount, hnsID) } } + // This will cleanup stale load balancers which are pending delete + // in last iteration + proxier.cleanupStaleLoadbalancers() +} + +// deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not. +// If it is needed, the function will delete the existing loadbalancer and return true, else false. +func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointsInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { + + if !winProxyOptimization || *lbHnsID == "" { + // Loadbalancer delete not needed + return false + } + + lbID, lbIdErr := findLoadBalancerID( + endpoints, + sourceVip, + protocol, + intPort, + extPort, + ) + + if lbIdErr != nil { + return proxier.deleteLoadBalancer(hns, lbHnsID) + } + + if _, ok := queriedLoadBalancers[lbID]; ok { + // The existing loadbalancer in the system is same as what we try to delete and recreate. So we skip deleting. + return false + } + + return proxier.deleteLoadBalancer(hns, lbHnsID) +} + +func (proxier *Proxier) deleteLoadBalancer(hns HostNetworkService, lbHnsID *string) bool { + klog.V(3).InfoS("Hns LoadBalancer delete triggered for loadBalancer resources", "lbHnsID", *lbHnsID) + if err := hns.deleteLoadBalancer(*lbHnsID); err != nil { + // This will be cleanup by cleanupStaleLoadbalancer fnction. + proxier.mapStaleLoadbalancers[*lbHnsID] = true + } + *lbHnsID = "" + return true }