Skip to content

Commit

Permalink
Ensure full functionality of AntreaProxy with proxyAll enabled when k…
Browse files Browse the repository at this point in the history
…ube-proxy presents

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed May 8, 2024
1 parent 5251502 commit 90b6c82
Show file tree
Hide file tree
Showing 17 changed files with 770 additions and 223 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func run(o *Options) error {
connectUplinkToBridge,
nodeNetworkPolicyEnabled,
multicastEnabled,
o.config.KubeAPIServerOverride != "",
serviceCIDRProvider)

Check failure on line 236 in cmd/antrea-agent/agent.go

View workflow job for this annotation

GitHub Actions / Build Antrea Windows binaries

too many arguments in call to route.NewClient

Check failure on line 236 in cmd/antrea-agent/agent.go

View workflow job for this annotation

GitHub Actions / golicense

too many arguments in call to route.NewClient

Check failure on line 236 in cmd/antrea-agent/agent.go

View workflow job for this annotation

GitHub Actions / Unit test (windows-2022)

too many arguments in call to route.NewClient

Check failure on line 236 in cmd/antrea-agent/agent.go

View workflow job for this annotation

GitHub Actions / Analyze on Windows (go)

too many arguments in call to route.NewClient
if err != nil {
return fmt.Errorf("error creating route client: %v", err)
Expand Down
13 changes: 7 additions & 6 deletions pkg/agent/controller/networkpolicy/node_reconciler_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/types"
ipsetutil "antrea.io/antrea/pkg/agent/util/ipset"
"antrea.io/antrea/pkg/agent/util/iptables"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
Expand Down Expand Up @@ -622,7 +623,7 @@ func buildCoreIPTRule(ipProtocol iptables.Protocol,
builder := iptables.NewRuleBuilder(iptChain)
if isIngress {
if ipset != "" {
builder = builder.MatchIPSetSrc(ipset)
builder = builder.MatchIPSetSrc(ipset, ipsetutil.HashIP)
} else if ipnet != "" {
builder = builder.MatchCIDRSrc(ipnet)
} else {
Expand All @@ -631,7 +632,7 @@ func buildCoreIPTRule(ipProtocol iptables.Protocol,
}
} else {
if ipset != "" {
builder = builder.MatchIPSetDst(ipset)
builder = builder.MatchIPSetDst(ipset, ipsetutil.HashIP)
} else if ipnet != "" {
builder = builder.MatchCIDRDst(ipnet)
} else {
Expand All @@ -648,8 +649,8 @@ func buildCoreIPTRule(ipProtocol iptables.Protocol,
fallthrough
case "sctp":
builder = builder.MatchTransProtocol(transProtocol).
MatchSrcPort(service.SrcPort, service.SrcEndPort).
MatchDstPort(service.Port, service.EndPort)
MatchPortSrc(service.SrcPort, service.SrcEndPort).
MatchPortDst(service.Port, service.EndPort)
case "icmp":
builder = builder.MatchICMP(service.ICMPType, service.ICMPCode, ipProtocol)
}
Expand All @@ -673,8 +674,8 @@ func buildServiceIPTRules(ipProtocol iptables.Protocol, services []v1beta2.Servi
fallthrough
case "sctp":
copiedBuilder = copiedBuilder.MatchTransProtocol(transProtocol).
MatchSrcPort(svc.SrcPort, svc.SrcEndPort).
MatchDstPort(svc.Port, svc.EndPort)
MatchPortSrc(svc.SrcPort, svc.SrcEndPort).
MatchPortDst(svc.Port, svc.EndPort)
case "icmp":
copiedBuilder = copiedBuilder.MatchICMP(svc.ICMPType, svc.ICMPCode, ipProtocol)
}
Expand Down
62 changes: 52 additions & 10 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (p *proxier) removeServiceFlows(svcInfo *types.ServiceInfo) bool {
svcInfoStr := svcInfo.String()
svcPort := uint16(svcInfo.Port())
svcProto := svcInfo.OFProtocol
loadBalancerMode := p.getLoadBalancerMode(svcInfo)
// Remove ClusterIP flows.
if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), svcPort, svcProto); err != nil {
klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServiceInfo", svcInfoStr)
Expand All @@ -242,14 +243,14 @@ func (p *proxier) removeServiceFlows(svcInfo *types.ServiceInfo) bool {
return false
}
// Remove ExternalIP flows and configurations.
if err := p.uninstallExternalIPService(svcInfoStr, svcInfo.ExternalIPStrings(), svcPort, svcProto); err != nil {
if err := p.uninstallExternalIPService(svcInfoStr, svcInfo.ExternalIPStrings(), svcPort, svcProto, loadBalancerMode); err != nil {
klog.ErrorS(err, "Error when uninstalling ExternalIP flows and configurations for Service", "ServiceInfo", svcInfoStr)
return false
}
}
// Remove LoadBalancer flows and configurations.
if p.proxyLoadBalancerIPs {
if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), svcPort, svcProto); err != nil {
if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), svcPort, svcProto, loadBalancerMode); err != nil {
klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServiceInfo", svcInfoStr)
return false
}
Expand Down Expand Up @@ -554,7 +555,7 @@ func (p *proxier) installNodePortService(localGroupID, clusterGroupID binding.Gr
}); err != nil {
return fmt.Errorf("failed to install NodePort load balancing flows: %w", err)
}
if err := p.routeClient.AddNodePort(p.nodePortAddresses, svcPort, protocol); err != nil {
if err := p.routeClient.AddNodePortConf(p.nodePortAddresses, svcPort, protocol); err != nil {
return fmt.Errorf("failed to install NodePort traffic redirecting rules: %w", err)
}
return nil
Expand All @@ -571,7 +572,7 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot
if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove NodePort load balancing flows: %w", err)
}
if err := p.routeClient.DeleteNodePort(p.nodePortAddresses, svcPort, protocol); err != nil {
if err := p.routeClient.DeleteNodePortConf(p.nodePortAddresses, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove NodePort traffic redirecting rules: %w", err)
}
return nil
Expand All @@ -586,6 +587,8 @@ func (p *proxier) installExternalIPService(svcInfoStr string,
trafficPolicyLocal bool,
affinityTimeout uint16,
loadBalancerMode agentconfig.LoadBalancerMode) error {
externalIPs := make([]net.IP, 0, len(externalIPStrings))
isDSR := features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR
for _, externalIP := range externalIPStrings {
ip := net.ParseIP(externalIP)
if err := p.ofClient.InstallServiceFlows(&agenttypes.ServiceConfig{
Expand All @@ -599,18 +602,29 @@ func (p *proxier) installExternalIPService(svcInfoStr string,
IsExternal: true,
IsNodePort: false,
IsNested: false, // Unsupported for ExternalIP
IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR,
IsDSR: isDSR,
}); err != nil {
return fmt.Errorf("failed to install ExternalIP load balancing flows: %w", err)
}
if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil {
return fmt.Errorf("failed to install ExternalIP traffic redirecting routes: %w", err)
}
externalIPs = append(externalIPs, ip)
}
if len(externalIPs) != 0 {
if err := p.routeClient.AddExternalIPConf(externalIPs, svcPort, protocol, isDSR); err != nil {
return err
}
}
return nil
}

func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPStrings []string, svcPort uint16, protocol binding.Protocol) error {
func (p *proxier) uninstallExternalIPService(svcInfoStr string,
externalIPStrings []string,
svcPort uint16,
protocol binding.Protocol,
loadBalancerMode agentconfig.LoadBalancerMode) error {
externalIPs := make([]net.IP, 0, len(externalIPStrings))
for _, externalIP := range externalIPStrings {
ip := net.ParseIP(externalIP)
if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil {
Expand All @@ -619,6 +633,13 @@ func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPString
if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil {
return fmt.Errorf("failed to remove ExternalIP traffic redirecting routes: %w", err)
}
externalIPs = append(externalIPs, ip)
}
isDSR := features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR
if len(externalIPs) != 0 {
if err := p.routeClient.DeleteExternalIPConf(externalIPs, svcPort, protocol, isDSR); err != nil {
return err
}
}
return nil
}
Expand All @@ -632,6 +653,8 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string,
trafficPolicyLocal bool,
affinityTimeout uint16,
loadBalancerMode agentconfig.LoadBalancerMode) error {
loadBalancerIPs := make([]net.IP, 0, len(loadBalancerIPStrings))
isDSR := features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR
for _, ingress := range loadBalancerIPStrings {
if ingress != "" {
ip := net.ParseIP(ingress)
Expand All @@ -646,17 +669,24 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string,
IsExternal: true,
IsNodePort: false,
IsNested: false, // Unsupported for LoadBalancerIP
IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR,
IsDSR: isDSR,
}); err != nil {
return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err)
}
if p.proxyAll {
if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil {
return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err)
}
loadBalancerIPs = append(loadBalancerIPs, ip)
}
}
}
if p.proxyAll && len(loadBalancerIPs) != 0 {
if err := p.routeClient.AddLoadBalancerConf(loadBalancerIPs, svcPort, protocol, isDSR); err != nil {
return err
}
}

return nil
}

Expand All @@ -677,7 +707,12 @@ func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn
return nil
}

func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error {
func (p *proxier) uninstallLoadBalancerService(svcInfoStr string,
loadBalancerIPStrings []string,
svcPort uint16,
protocol binding.Protocol,
loadBalancerMode agentconfig.LoadBalancerMode) error {
loadBalancerIPs := make([]net.IP, 0, len(loadBalancerIPStrings))
for _, ingress := range loadBalancerIPStrings {
if ingress != "" {
ip := net.ParseIP(ingress)
Expand All @@ -688,9 +723,16 @@ func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIP
if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil {
return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err)
}
loadBalancerIPs = append(loadBalancerIPs, ip)
}
}
}
if p.proxyAll && len(loadBalancerIPs) != 0 {
isDSR := features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR
if err := p.routeClient.DeleteLoadBalancerConf(loadBalancerIPs, svcPort, protocol, isDSR); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -937,7 +979,7 @@ func (p *proxier) updateServiceExternalAddresses(pSvcInfo, svcInfo *types.Servic
}
deletedExternalIPs := smallSliceDifference(pSvcInfo.ExternalIPStrings(), svcInfo.ExternalIPStrings())
addedExternalIPs := smallSliceDifference(svcInfo.ExternalIPStrings(), pSvcInfo.ExternalIPStrings())
if err := p.uninstallExternalIPService(pSvcInfoStr, deletedExternalIPs, pSvcPort, pSvcProto); err != nil {
if err := p.uninstallExternalIPService(pSvcInfoStr, deletedExternalIPs, pSvcPort, pSvcProto, loadBalancerMode); err != nil {
klog.ErrorS(err, "Error when uninstalling ExternalIP flows and configurations for Service", "ServiceInfo", pSvcInfoStr)
return false
}
Expand All @@ -949,7 +991,7 @@ func (p *proxier) updateServiceExternalAddresses(pSvcInfo, svcInfo *types.Servic
if p.proxyLoadBalancerIPs {
deletedLoadBalancerIPs := smallSliceDifference(pSvcInfo.LoadBalancerIPStrings(), svcInfo.LoadBalancerIPStrings())
addedLoadBalancerIPs := smallSliceDifference(svcInfo.LoadBalancerIPStrings(), pSvcInfo.LoadBalancerIPStrings())
if err := p.uninstallLoadBalancerService(pSvcInfoStr, deletedLoadBalancerIPs, pSvcPort, pSvcProto); err != nil {
if err := p.uninstallLoadBalancerService(pSvcInfoStr, deletedLoadBalancerIPs, pSvcPort, pSvcProto, loadBalancerMode); err != nil {
klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServiceInfo", pSvcInfoStr)
return false
}
Expand Down
Loading

0 comments on commit 90b6c82

Please sign in to comment.