Skip to content

Commit

Permalink
Support ProxyTerminatingEndpoints in AntreaProxy (#4607)
Browse files Browse the repository at this point in the history
Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl authored Mar 2, 2023
1 parent bf45314 commit 6339388
Show file tree
Hide file tree
Showing 7 changed files with 697 additions and 144 deletions.
6 changes: 3 additions & 3 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ func Test_client_InstallServiceFlows(t *testing.T) {
},
},
{
name: "Service ClusterIP,NodeLocalExternal true,SessionAffinity",
name: "Service ClusterIP,ExternalPolicyLocal true,SessionAffinity",
protocol: binding.ProtocolTCPv6,
svcIP: svcIPv6,
affinityTimeout: uint16(100),
Expand All @@ -1102,7 +1102,7 @@ func Test_client_InstallServiceFlows(t *testing.T) {
},
},
{
name: "Service NodePort,NodeLocalExternal true,SessionAffinity",
name: "Service NodePort,ExternalPolicyLocal true,SessionAffinity",
protocol: binding.ProtocolUDPv6,
svcIP: svcIPv6,
affinityTimeout: uint16(100),
Expand All @@ -1124,7 +1124,7 @@ func Test_client_InstallServiceFlows(t *testing.T) {
},
},
{
name: "Service LoadBalancer,NodeLocalExternal true,SessionAffinity",
name: "Service LoadBalancer,ExternalPolicyLocal true,SessionAffinity",
protocol: binding.ProtocolSCTPv6,
svcIP: svcIPv6,
affinityTimeout: uint16(100),
Expand Down
104 changes: 44 additions & 60 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (p *proxier) removeStaleServices() {
}
}
// Remove Service group whose Endpoints are local.
if svcInfo.NodeLocalExternal() {
if svcInfo.ExternalPolicyLocal() {
if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist {
if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil {
klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName)
Expand Down Expand Up @@ -259,7 +259,7 @@ func serviceIdentityChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool {
svcInfo.Port() != pSvcInfo.Port() ||
svcInfo.OFProtocol != pSvcInfo.OFProtocol ||
svcInfo.NodePort() != pSvcInfo.NodePort() ||
svcInfo.NodeLocalExternal() != pSvcInfo.NodeLocalExternal()
svcInfo.ExternalPolicyLocal() != pSvcInfo.ExternalPolicyLocal()
}

// smallSliceDifference builds a slice which includes all the strings from s1
Expand Down Expand Up @@ -353,12 +353,9 @@ func (p *proxier) installServices() {
endpointsInstalled = map[string]k8sproxy.Endpoint{}
p.endpointsInstalledMap[svcPortName] = endpointsInstalled
}
endpoints := p.endpointsMap[svcPortName]
if p.topologyAwareHintsEnabled {
endpoints = filterEndpoints(endpoints, svcInfo, p.nodeLabels)
}
endpointsToInstall := p.endpointsMap[svcPortName]
// If both expected Endpoints number and installed Endpoints number are 0, we don't need to take care of this Service.
if len(endpoints) == 0 && len(endpointsInstalled) == 0 {
if len(endpointsToInstall) == 0 && len(endpointsInstalled) == 0 {
continue
}

Expand All @@ -370,8 +367,8 @@ func (p *proxier) installServices() {
needRemoval = serviceIdentityChanged(svcInfo, pSvcInfo) || (svcInfo.SessionAffinityType() != pSvcInfo.SessionAffinityType())
needUpdateService = needRemoval || (svcInfo.StickyMaxAgeSeconds() != pSvcInfo.StickyMaxAgeSeconds())
needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType() ||
pSvcInfo.NodeLocalExternal() != svcInfo.NodeLocalExternal() ||
pSvcInfo.NodeLocalInternal() != svcInfo.NodeLocalInternal()
pSvcInfo.ExternalPolicyLocal() != svcInfo.ExternalPolicyLocal() ||
pSvcInfo.InternalPolicyLocal() != svcInfo.InternalPolicyLocal()
} else { // Need to install.
needUpdateService = true
}
Expand All @@ -398,33 +395,24 @@ func (p *proxier) installServices() {
}
}

var internalNodeLocal, externalNodeLocal bool
if svcInfo.NodeLocalInternal() {
internalNodeLocal = true
var internalPolicyLocal, externalPolicyLocal bool
if svcInfo.InternalPolicyLocal() {
internalPolicyLocal = true
}
if p.proxyAll && svcInfo.NodeLocalExternal() {
externalNodeLocal = true
if p.proxyAll && svcInfo.ExternalPolicyLocal() {
externalPolicyLocal = true
}

var allEndpointUpdateList, localEndpointUpdateList []k8sproxy.Endpoint
// Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy
// are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy
// is Cluster, all Endpoints should be installed and checked.
for _, endpoint := range endpoints {
if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal {
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
}
allEndpointUpdateList = append(allEndpointUpdateList, endpoint)
if endpoint.GetIsLocal() {
localEndpointUpdateList = append(localEndpointUpdateList, endpoint)
clusterEndpoints, localEndpoints, allReachableEndpoints := p.categorizeEndpoints(endpointsToInstall, svcInfo)
// If there are new Endpoints, Endpoints installed should be updated.
for _, endpoint := range allReachableEndpoints {
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
break
}
}

// If there are expired Endpoints, Endpoints installed should be updated.
if internalNodeLocal && externalNodeLocal && len(localEndpointUpdateList) < len(endpointsInstalled) ||
!(internalNodeLocal && externalNodeLocal) && len(allEndpointUpdateList) < len(endpointsInstalled) {
if len(allReachableEndpoints) < len(endpointsInstalled) {
klog.V(2).Infof("Some Endpoints of Service %s removed, updating Endpoints", svcInfo.String())
needUpdateEndpoints = true
}
Expand Down Expand Up @@ -455,40 +443,31 @@ func (p *proxier) installServices() {
}

if needUpdateEndpoints {
var endpointUpdateList []k8sproxy.Endpoint
// If the type of the Service is NodePort or LoadBalancer and both internalTrafficPolicy and externalTrafficPolicy
// are Local, or the type of the Service is ClusterIP and internalTrafficPolicy is Local, then only local
// Endpoints should be installed, otherwise all Endpoints should be installed.
if internalNodeLocal && (externalNodeLocal || svcInfo.NodePort() == 0) {
endpointUpdateList = localEndpointUpdateList
} else {
endpointUpdateList = allEndpointUpdateList
}
// Install Endpoints.
err := p.ofClient.InstallEndpointFlows(svcInfo.OFProtocol, endpointUpdateList)
err := p.ofClient.InstallEndpointFlows(svcInfo.OFProtocol, allReachableEndpoints)
if err != nil {
klog.ErrorS(err, "Error when installing Endpoints flows")
continue
}
if internalNodeLocal != externalNodeLocal {
if svcInfo.NodePort() > 0 {
if internalPolicyLocal != externalPolicyLocal {
if svcInfo.ExternallyAccessible() {
// If the type of the Service is NodePort or LoadBalancer, when internalTrafficPolicy and externalTrafficPolicy
// of the Service are different, install two groups. One group has all Endpoints, the other has only
// of the Service are different, install two groups. One group has cluster Endpoints, the other has
// local Endpoints.
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, true)
if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, localEndpointUpdateList); err != nil {
if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, localEndpoints); err != nil {
klog.ErrorS(err, "Error when installing Group of local Endpoints for Service", "Service", svcPortName)
continue
}
groupID = p.groupCounter.AllocateIfNotExist(svcPortName, false)
if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, allEndpointUpdateList); err != nil {
if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, clusterEndpoints); err != nil {
klog.ErrorS(err, "Error when installing Group of all Endpoints for Service", "Service", svcPortName)
continue
}
} else {
// If the type of the Service is ClusterIP, install a group according to internalTrafficPolicy.
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalNodeLocal)
if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, endpointUpdateList); err != nil {
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalPolicyLocal)
if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, allReachableEndpoints); err != nil {
klog.ErrorS(err, "Error when installing Group of Endpoints for Service", "Service", svcPortName)
continue
}
Expand All @@ -500,22 +479,24 @@ func (p *proxier) installServices() {
// uninstall the group which has all Endpoints; if both internalTrafficPolicy and externalTrafficPolicy are
// Cluster, install the group which has all Endpoints and unconditionally uninstall the group which has
// only local Endpoints. Note that, if a group doesn't exist on OVS, then the return value will be nil.
nodeLocalVal := internalNodeLocal && externalNodeLocal
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalVal)
if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, endpointUpdateList); err != nil {
// Note that, since internalTrafficPolicy and externalTrafficPolicy are the same, bothPolicyLocal just equals
// internalPolicyLocal.
bothPolicyLocal := internalPolicyLocal
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, bothPolicyLocal)
if err = p.ofClient.InstallServiceGroup(groupID, affinityTimeout != 0, allReachableEndpoints); err != nil {
klog.ErrorS(err, "Error when installing Group of local Endpoints for Service", "Service", svcPortName)
continue
}
if groupID, exist := p.groupCounter.Get(svcPortName, !nodeLocalVal); exist {
if groupID, exist := p.groupCounter.Get(svcPortName, !bothPolicyLocal); exist {
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to uninstall Group of all Endpoints for Service", "Service", svcPortName)
continue
}
p.groupCounter.Recycle(svcPortName, !nodeLocalVal)
p.groupCounter.Recycle(svcPortName, !bothPolicyLocal)
}
}

for _, e := range endpointUpdateList {
for _, e := range allReachableEndpoints {
// If the Endpoint is newly installed, add a reference.
if _, ok := endpointsInstalled[e.String()]; !ok {
key := endpointKey(e, svcInfo.OFProtocol)
Expand Down Expand Up @@ -553,14 +534,14 @@ func (p *proxier) installServices() {
}

// Install ClusterIP flows for the Service.
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalNodeLocal)
if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), externalNodeLocal, corev1.ServiceTypeClusterIP); err != nil {
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalPolicyLocal)
if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), externalPolicyLocal, corev1.ServiceTypeClusterIP); err != nil {
klog.Errorf("Error when installing Service flows: %v", err)
continue
}

if p.proxyAll {
nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalNodeLocal)
nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalPolicyLocal)
// Install ClusterIP route on Node so that ClusterIP can be accessed on Node. Every time a new ClusterIP
// is created, the routing target IP block will be recalculated for expansion to be able to route the new
// created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR
Expand All @@ -573,15 +554,15 @@ func (p *proxier) installServices() {
// If previous Service is nil or NodePort flows and configurations of previous Service have been removed,
// install NodePort flows and configurations for current Service.
if svcInfo.NodePort() > 0 && (pSvcInfo == nil || needRemoval) {
if err := p.installNodePortService(nGroupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.NodeLocalExternal()); err != nil {
if err := p.installNodePortService(nGroupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil {
klog.ErrorS(err, "Failed to install NodePort flows and configurations of Service", "Service", svcPortName)
continue
}
}
}

if p.proxyLoadBalancerIPs {
nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalNodeLocal)
nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalPolicyLocal)
// Service LoadBalancer flows can be partially updated.
var toDelete, toAdd []string
if needRemoval {
Expand All @@ -600,7 +581,7 @@ func (p *proxier) installServices() {
}
// Install LoadBalancer flows and configurations.
if len(toAdd) > 0 {
if err := p.installLoadBalancerService(nGroupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.NodeLocalExternal()); err != nil {
if err := p.installLoadBalancerService(nGroupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil {
klog.ErrorS(err, "Failed to install LoadBalancer flows and configurations of Service", "Service", svcPortName)
continue
}
Expand Down Expand Up @@ -847,6 +828,9 @@ func (p *proxier) Run(stopCh <-chan struct{}) {
go p.serviceConfig.Run(stopCh)
if p.endpointSliceEnabled {
go p.endpointSliceConfig.Run(stopCh)
if p.topologyAwareHintsEnabled {
go p.nodeConfig.Run(stopCh)
}
} else {
go p.endpointsConfig.Run(stopCh)
}
Expand Down Expand Up @@ -923,7 +907,7 @@ func NewProxier(
klog.V(2).Infof("Creating proxier with IPv6 enabled=%t", isIPv6)

endpointSliceEnabled := features.DefaultFeatureGate.Enabled(features.EndpointSlice)
topologyAwareHintsEnabled := features.DefaultFeatureGate.Enabled(features.TopologyAwareHints)
topologyAwareHintsEnabled := endpointSliceEnabled && features.DefaultFeatureGate.Enabled(features.TopologyAwareHints)
ipFamily := corev1.IPv4Protocol
if isIPv6 {
ipFamily = corev1.IPv6Protocol
Expand Down
Loading

0 comments on commit 6339388

Please sign in to comment.