Skip to content

Commit

Permalink
Add support for ExternalIP in AntreaProxy
Browse files Browse the repository at this point in the history
Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Apr 20, 2023
1 parent 0bf9be6 commit 18bc9a6
Show file tree
Hide file tree
Showing 8 changed files with 495 additions and 203 deletions.
21 changes: 21 additions & 0 deletions pkg/agent/config/service_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2022 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import v1 "k8s.io/api/core/v1"

const (
ServiceTypeExternalIP v1.ServiceType = "ExternalIP"
)
10 changes: 5 additions & 5 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2334,9 +2334,9 @@ func (f *featureService) serviceLearnFlow(groupID binding.GroupIDType,
case binding.ProtocolSCTPv6:
learnFlowBuilderLearnAction = learnFlowBuilderLearnAction.MatchLearnedSCTPv6DstPort()
}
// If externalTrafficPolicy of NodePort/LoadBalancer is Cluster, the learned flow which
// is used to match the first packet of NodePort/LoadBalancer also requires SNAT.
if (svcType == v1.ServiceTypeNodePort || svcType == v1.ServiceTypeLoadBalancer) && !nodeLocalExternal {
// If externalTrafficPolicy of NodePort/LoadBalancer/ExternalIP is Cluster, the learned flow which
// is used to match the first packet of NodePort/LoadBalancer/ExternalIP also requires SNAT.
if (svcType == v1.ServiceTypeNodePort || svcType == v1.ServiceTypeLoadBalancer || svcType == config.ServiceTypeExternalIP) && !nodeLocalExternal {
learnFlowBuilderLearnAction = learnFlowBuilderLearnAction.LoadRegMark(ToClusterServiceRegMark)
}

Expand Down Expand Up @@ -2400,10 +2400,10 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
MatchDstPort(svcPort, nil).
Action().LoadRegMark(regMarksToLoad...)
} else {
// If externalTrafficPolicy of a LoadBalancer Service is Cluster (nodeLocalExternal=false), it loads
// If externalTrafficPolicy of a LoadBalancer/ExternalIP Service is Cluster (nodeLocalExternal=false), it loads
// ToClusterServiceRegMark to indicate it. The mark will be checked later when determining if SNAT is required
// or not.
if serviceType == v1.ServiceTypeLoadBalancer && !nodeLocalExternal {
if (serviceType == v1.ServiceTypeLoadBalancer || serviceType == config.ServiceTypeExternalIP) && !nodeLocalExternal {
regMarksToLoad = append(regMarksToLoad, ToClusterServiceRegMark)
}
flowBuilder = ServiceLBTable.ofTable.BuildFlow(priorityNormal).
Expand Down
62 changes: 58 additions & 4 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,18 @@ func (p *proxier) removeServiceFlows(svcInfo *types.ServiceInfo) bool {
klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServiceInfo", svcInfoStr)
return false
}
// Remove NodePort flows and configurations.

if p.proxyAll {
// Remove NodePort flows and configurations.
if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServiceInfo", svcInfoStr)
return false
}
// Remove ExternalIP flows and configurations.
if err := p.uninstallExternalIPService(svcInfoStr, svcInfo.ExternalIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Error when uninstalling ExternalIP flows and configurations for Service", "ServiceInfo", svcInfoStr)
return false
}
}
// Remove LoadBalancer flows and configurations.
if p.proxyLoadBalancerIPs {
Expand Down Expand Up @@ -325,7 +331,9 @@ func serviceIdentityChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool {
}

func serviceExternalAddressesChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool {
return svcInfo.NodePort() != pSvcInfo.NodePort() || !slices.Equal(svcInfo.LoadBalancerIPStrings(), pSvcInfo.LoadBalancerIPStrings())
return svcInfo.NodePort() != pSvcInfo.NodePort() ||
!slices.Equal(svcInfo.LoadBalancerIPStrings(), pSvcInfo.LoadBalancerIPStrings()) ||
!slices.Equal(svcInfo.ExternalIPStrings(), pSvcInfo.ExternalIPStrings())
}

// smallSliceDifference builds a slice which includes all the strings from s1
Expand Down Expand Up @@ -383,6 +391,36 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot
return nil
}

func (p *proxier) installExternalIPService(svcInfoStr string, groupID binding.GroupIDType, externalIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error {
for _, externalIP := range externalIPStrings {
if externalIP != "" {
ip := net.ParseIP(externalIP)
if err := p.ofClient.InstallServiceFlows(groupID, ip, svcPort, protocol, affinityTimeout, nodeLocalExternal, agentconfig.ServiceTypeExternalIP, false); err != nil {
return fmt.Errorf("failed to install ExternalIP load balancing flows: %w", err)
}
if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIP); err != nil {
return fmt.Errorf("failed to install ExternalIP traffic redirecting routes: %w", err)
}
}
}
return nil
}

func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPStrings []string, svcPort uint16, protocol binding.Protocol) error {
for _, externalIP := range externalIPStrings {
if externalIP != "" {
ip := net.ParseIP(externalIP)
if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove ExternalIP load balancing flows: %w", err)
}
if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIP); err != nil {
return fmt.Errorf("failed to remove ExternalIP traffic redirecting routes: %w", err)
}
}
}
return nil
}

func (p *proxier) installLoadBalancerService(svcInfoStr string, groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error {
for _, ingress := range loadBalancerIPStrings {
if ingress != "" {
Expand Down Expand Up @@ -588,8 +626,14 @@ func (p *proxier) installServiceFlows(svcInfo *types.ServiceInfo, internalGroupI
klog.ErrorS(err, "Error when installing ClusterIP flows for Service", "ServiceInfo", svcInfoStr)
return false
}
// Install NodePort flows and configurations.

if p.proxyAll {
// Install ExternalIP flows and configurations.
if err := p.installExternalIPService(svcInfo.String(), externalGroupID, svcInfo.ExternalIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol, affinityTimeout, svcInfo.ExternalPolicyLocal()); err != nil {
klog.ErrorS(err, "Error when installing ExternalIP flows and configurations for Service", "ServiceInfo", svcInfoStr)
return false
}
// Install NodePort flows and configurations.
if err := p.installNodePortService(externalGroupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, affinityTimeout, svcInfo.ExternalPolicyLocal()); err != nil {
klog.ErrorS(err, "Error when installing NodePort flows and configurations for Service", "ServiceInfo", svcInfoStr)
return false
Expand Down Expand Up @@ -621,6 +665,16 @@ func (p *proxier) updateServiceExternalAddresses(pSvcInfo, svcInfo *types.Servic
return false
}
}
deletedExternalIPs := smallSliceDifference(pSvcInfo.ExternalIPStrings(), svcInfo.ExternalIPStrings())
addedExternalIPs := smallSliceDifference(svcInfo.ExternalIPStrings(), pSvcInfo.ExternalIPStrings())
if err := p.uninstallExternalIPService(pSvcInfoStr, deletedExternalIPs, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Error when uninstalling ExternalIP flows and configurations for Service", "ServiceInfo", pSvcInfoStr)
return false
}
if err := p.installExternalIPService(svcInfoStr, externalGroupID, addedExternalIPs, uint16(svcInfo.Port()), svcInfo.OFProtocol, affinityTimeout, svcInfo.ExternalPolicyLocal()); err != nil {
klog.ErrorS(err, "Error when installing ExternalIP flows and configurations for Service", "ServiceInfo", svcInfoStr)
return false
}
}
if p.proxyLoadBalancerIPs {
deletedLoadBalancerIPs := smallSliceDifference(pSvcInfo.LoadBalancerIPStrings(), svcInfo.LoadBalancerIPStrings())
Expand Down Expand Up @@ -666,7 +720,7 @@ func compareEndpoints(endpointsCached map[string]k8sproxy.Endpoint, endpointsIns
// endpoints or services resources are not synced. syncProxyRules is only called
// through the Run method of the runner object, and all calls are serialized.
// This method is the only one that changes internal state, but
// GetServiceFlowKeys(), which is called by the the "/ovsflows" API handler,
// GetServiceFlowKeys(), which is called by the "/ovsflows" API handler,
// also reads service and endpoints maps, so serviceEndpointsMapsMutex is used
// to protect these two maps.
func (p *proxier) syncProxyRules() {
Expand Down
Loading

0 comments on commit 18bc9a6

Please sign in to comment.