Skip to content

Commit

Permalink
Refactor Windows SNAT flows for SNAT policy implementation (#1892)
Browse files Browse the repository at this point in the history
Separate common flows and Windows only flows for SNAT.
Add a new snatTable(71) between l3ForwardingTable and l3DecTTLTable(72)
for looking up SNAT IPs of the external traffic.
  • Loading branch information
jianjuns authored Mar 17, 2021
1 parent 8467011 commit e984a6e
Show file tree
Hide file tree
Showing 12 changed files with 491 additions and 317 deletions.
3 changes: 2 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func run(o *Options) error {
ovsBridgeMgmtAddr := ofconfig.GetMgmtAddress(o.config.OVSRunDir, o.config.OVSBridge)
ofClient := openflow.NewClient(o.config.OVSBridge, ovsBridgeMgmtAddr, ovsDatapathType,
features.DefaultFeatureGate.Enabled(features.AntreaProxy),
features.DefaultFeatureGate.Enabled(features.AntreaPolicy))
features.DefaultFeatureGate.Enabled(features.AntreaPolicy),
false)

_, serviceCIDRNet, _ := net.ParseCIDR(o.config.ServiceCIDR)
var serviceCIDRNetv6 *net.IPNet
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ func (i *Initializer) initOpenFlowPipeline() error {
return err
}

// On Windows platform, extra flows are needed to perform SNAT for the
// traffic to external network.
if err := i.initExternalConnectivityFlows(); err != nil {
// Install OpenFlow entries to enable Pod traffic to external IP
// addresses.
if err := i.ofClient.InstallExternalFlows(); err != nil {
klog.Errorf("Failed to install openflow entries for external connectivity: %v", err)
return err
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/agent/agent_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ func (i *Initializer) initHostNetworkFlows() error {
return nil
}

// initExternalConnectivityFlows returns immediately on Linux. The corresponding functions are provided in routeClient.
func (i *Initializer) initExternalConnectivityFlows() error {
return nil
}

// getTunnelLocalIP returns local_ip of tunnel port.
// On linux platform, local_ip option is not needed.
func (i *Initializer) getTunnelPortLocalIP() net.IP {
Expand Down
13 changes: 0 additions & 13 deletions pkg/agent/agent_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,6 @@ func (i *Initializer) initHostNetworkFlows() error {
return nil
}

// initExternalConnectivityFlows installs OpenFlow entries to SNAT Pod traffic
// using Node IP, and then Pod could communicate to the external IP addresses.
func (i *Initializer) initExternalConnectivityFlows() error {
if i.nodeConfig.PodIPv4CIDR == nil {
return fmt.Errorf("Failed to find valid IPv4 PodCIDR")
}
// Install OpenFlow entries on the OVS to enable Pod traffic to communicate to external IP addresses.
if err := i.ofClient.InstallExternalFlows(); err != nil {
return err
}
return nil
}

// getTunnelLocalIP returns local_ip of tunnel port
func (i *Initializer) getTunnelPortLocalIP() net.IP {
return i.nodeConfig.NodeIPAddr.IP
Expand Down
48 changes: 15 additions & 33 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ type Client interface {
// This function is only used for Windows platform.
InstallBridgeUplinkFlows() error

// InstallExternalFlows sets up flows to enable Pods to communicate to the external IP addresses. The corresponding
// OpenFlow entries include: 1) identify the packets from local Pods to the external IP address, 2) mark the traffic
// in the connection tracking context, and 3) SNAT the packets with Node IP.
// This function is only used for Windows platform.
// InstallExternalFlows sets up flows to enable Pods to communicate to
// the external IP addresses. The flows identify the packets from local
// Pods to the external IP address, and mark the packets to be SNAT'd
// with the configured SNAT IPs. On Windows Node, the flows also perform
// SNAT with the Openflow NAT action.
InstallExternalFlows() error

// Disconnect disconnects the connection between client and OFSwitch.
Expand Down Expand Up @@ -476,22 +477,6 @@ func (c *client) UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol bi
return c.deleteFlows(c.serviceFlowCache, cacheKey)
}

func (c *client) InstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.loadBalancerServiceFromOutsideFlow(svcIP, svcPort, protocol))
cacheKey := fmt.Sprintf("LoadBalancerService_%s_%d_%s", svcIP, svcPort, protocol)
return c.addFlows(c.serviceFlowCache, cacheKey, flows)
}

func (c *client) UninstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
cacheKey := fmt.Sprintf("LoadBalancerService_%s_%d_%s", svcIP, svcPort, protocol)
return c.deleteFlows(c.serviceFlowCache, cacheKey)
}

func (c *client) GetServiceFlowKeys(svcIP net.IP, svcPort uint16, protocol binding.Protocol, endpoints []proxy.Endpoint) []string {
cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol)
flowKeys := c.getFlowKeysFromCache(c.serviceFlowCache, cacheKey)
Expand Down Expand Up @@ -579,16 +564,6 @@ func (c *client) InstallDefaultTunnelFlows() error {
return nil
}

func (c *client) InstallBridgeUplinkFlows() error {
flows := c.hostBridgeUplinkFlows(*c.nodeConfig.PodIPv4CIDR, cookie.Default)
c.hostNetworkingFlows = flows
if err := c.ofEntryOperations.AddAll(flows); err != nil {
return err
}
c.hostNetworkingFlows = flows
return nil
}

func (c *client) initialize() error {
if err := c.ofEntryOperations.AddAll(c.defaultFlows()); err != nil {
return fmt.Errorf("failed to install default flows: %v", err)
Expand Down Expand Up @@ -655,9 +630,16 @@ func (c *client) Initialize(roundInfo types.RoundInfo, nodeConfig *config.NodeCo

func (c *client) InstallExternalFlows() error {
nodeIP := c.nodeConfig.NodeIPAddr.IP
podSubnet := c.nodeConfig.PodIPv4CIDR
flows := c.uplinkSNATFlows(cookie.SNAT)
flows = append(flows, c.snatFlows(nodeIP, *podSubnet, cookie.SNAT)...)
localGatewayMAC := c.nodeConfig.GatewayConfig.MAC

var flows []binding.Flow
if c.nodeConfig.PodIPv4CIDR != nil {
flows = c.externalFlows(nodeIP, *c.nodeConfig.PodIPv4CIDR, localGatewayMAC)
}
if c.nodeConfig.PodIPv6CIDR != nil {
flows = append(flows, c.externalFlows(nodeIP, *c.nodeConfig.PodIPv6CIDR, localGatewayMAC)...)
}

if err := c.ofEntryOperations.AddAll(flows); err != nil {
return fmt.Errorf("failed to install flows for external communication: %v", err)
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/agent/openflow/client_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// +build !windows
// package openflow is needed by antctl which is compiled for macOS too.

// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package openflow

import (
"net"

binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow"
)

func (c *client) InstallBridgeUplinkFlows() error {
return nil
}

func (c *client) InstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
return nil
}

func (c *client) UninstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
return nil
}
12 changes: 6 additions & 6 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestIdempotentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestIdempotentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestFlowInstallationFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestConcurrentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -467,7 +467,7 @@ func Test_client_SendTraceflowPacket(t *testing.T) {
}

func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false)
c := ofClient.(*client)
c.cookieAllocator = cookie.NewAllocator(0)
c.nodeConfig = nodeConfig
Expand All @@ -485,7 +485,7 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
}

func prepareSendTraceflowPacket(ctrl *gomock.Controller, success bool) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false)
c := ofClient.(*client)
c.nodeConfig = nodeConfig
m := ovsoftest.NewMockBridge(ctrl)
Expand Down
51 changes: 51 additions & 0 deletions pkg/agent/openflow/client_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// +build windows
// package openflow is needed by antctl which is compiled for macOS too.

// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package openflow

import (
"fmt"
"net"

"github.com/vmware-tanzu/antrea/pkg/agent/openflow/cookie"
binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow"
)

func (c *client) InstallBridgeUplinkFlows() error {
flows := c.hostBridgeUplinkFlows(*c.nodeConfig.PodIPv4CIDR, cookie.Default)
if err := c.ofEntryOperations.AddAll(flows); err != nil {
return err
}
c.hostNetworkingFlows = flows
return nil
}

func (c *client) InstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.loadBalancerServiceFromOutsideFlow(svcIP, svcPort, protocol))
cacheKey := fmt.Sprintf("L%s%s%x", svcIP, protocol, svcPort)
return c.addFlows(c.serviceFlowCache, cacheKey, flows)
}

func (c *client) UninstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
cacheKey := fmt.Sprintf("L%s%s%x", svcIP, protocol, svcPort)
return c.deleteFlows(c.serviceFlowCache, cacheKey)
}
Loading

0 comments on commit e984a6e

Please sign in to comment.