Skip to content

Commit

Permalink
Use ModifyAll() to modify existing flows in a bundle
Browse files Browse the repository at this point in the history
  • Loading branch information
lzhecheng committed May 14, 2021
1 parent 625f943 commit 5e6f48d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 25 deletions.
58 changes: 34 additions & 24 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/antrea/pkg/util/runtime"
"antrea.io/antrea/pkg/util/runtime"
"antrea.io/antrea/third_party/proxy"
)

Expand Down Expand Up @@ -294,20 +294,13 @@ func (c *client) IsConnected() bool {
// it will return immediately, otherwise it will use Bundle to add all flows, and then add them into the flow cache.
// If it fails to add the flows with Bundle, it will return the error and no flow cache is created. If the force parameter
// is true, flows will be added regardless of flow cache.
func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow, force bool) error {
if !force {
_, ok := cache.Load(flowCacheKey)
// If a flow cache entry already exists for the key, return immediately. Otherwise, add the flows to the switch
// and populate the cache with them.
if ok {
klog.Infof("Flows with cache key %s are already installed", flowCacheKey)
return nil
}
} else {
err := c.ofEntryOperations.DeleteAll(flows)
if err != nil {
return err
}
func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error {
_, ok := cache.Load(flowCacheKey)
// If a flow cache entry already exists for the key, return immediately. Otherwise, add the flows to the switch
// and populate the cache with them.
if ok {
klog.V(2).Infof("Flows with cache key %s are already installed", flowCacheKey)
return nil
}
err := c.ofEntryOperations.AddAll(flows)
if err != nil {
Expand All @@ -322,6 +315,20 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [
return nil
}

func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error {
err := c.ofEntryOperations.ModifyAll(flows)
if err != nil {
return err
}
fCache := flowCache{}
// Modify the flows in the flow cache.
for _, flow := range flows {
fCache[flow.MatchString()] = flow
}
cache.Store(flowCacheKey, fCache)
return nil
}

// deleteFlows deletes all the flows in the flow cache indexed by the provided flowCacheKey.
func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) error {
fCacheI, ok := cache.Load(flowCacheKey)
Expand Down Expand Up @@ -353,7 +360,7 @@ func (c *client) InstallNodeFlows(hostname string,
var flows []binding.Flow
localGatewayMAC := c.nodeConfig.GatewayConfig.MAC

forceAdd := false
modify := false
for peerPodCIDR, peerGatewayIP := range peerConfigs {
if peerGatewayIP.To4() != nil {
// Since broadcast is not supported in IPv6, ARP should happen only with IPv4 address, and ARP responder flows
Expand All @@ -366,7 +373,7 @@ func (c *client) InstallNodeFlows(hostname string,
flows = append(flows, c.l3FwdFlowToRemote(localGatewayMAC, *peerPodCIDR, tunnelPeerIP, cookie.Node))
} else if runtime.IsWindowsPlatform() && !c.encapMode.NeedsRoutingToPeer(tunnelPeerIP, c.nodeConfig.NodeIPAddr) && remoteGatewayMAC != nil {
flows = append(flows, c.l3FwdFlowToRemoteViaRouting(remoteGatewayMAC, *peerPodCIDR, cookie.Node)...)
forceAdd = true
modify = true
} else {
flows = append(flows, c.l3FwdFlowToRemoteViaGW(localGatewayMAC, *peerPodCIDR, cookie.Node))
}
Expand All @@ -379,7 +386,10 @@ func (c *client) InstallNodeFlows(hostname string,
flows = append(flows, c.tunnelClassifierFlow(ipsecTunOFPort, cookie.Node))
}

return c.addFlows(c.nodeFlowCache, hostname, flows, forceAdd)
if modify {
return c.modifyFlows(c.nodeFlowCache, hostname, flows)
}
return c.addFlows(c.nodeFlowCache, hostname, flows)
}

func (c *client) UninstallNodeFlows(hostname string) error {
Expand Down Expand Up @@ -414,7 +424,7 @@ func (c *client) InstallPodFlows(interfaceName string, podInterfaceIPs []net.IP,
c.l3FwdFlowRouteToPod(podInterfaceIPs, podInterfaceMAC, cookie.Pod)...,
)
}
return c.addFlows(c.podFlowCache, interfaceName, flows, false)
return c.addFlows(c.podFlowCache, interfaceName, flows)
}

func (c *client) UninstallPodFlows(interfaceName string) error {
Expand Down Expand Up @@ -490,7 +500,7 @@ func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []pro
if endpoint.GetIsLocal() {
flows = append(flows, c.hairpinSNATFlow(endpointIP))
}
if err := c.addFlows(c.serviceFlowCache, cacheKey, flows, false); err != nil {
if err := c.addFlows(c.serviceFlowCache, cacheKey, flows); err != nil {
return err
}
}
Expand Down Expand Up @@ -518,7 +528,7 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP,
flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout))
}
cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol)
return c.addFlows(c.serviceFlowCache, cacheKey, flows, false)
return c.addFlows(c.serviceFlowCache, cacheKey, flows)
}

func (c *client) UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
Expand Down Expand Up @@ -711,7 +721,7 @@ func (c *client) InstallSNATMarkFlows(snatIP net.IP, mark uint32) error {
cacheKey := fmt.Sprintf("s%x", mark)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.addFlows(c.snatFlowCache, cacheKey, flows, false)
return c.addFlows(c.snatFlowCache, cacheKey, flows)
}

func (c *client) UninstallSNATMarkFlows(mark uint32) error {
Expand All @@ -726,7 +736,7 @@ func (c *client) InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint
cacheKey := fmt.Sprintf("p%x", ofPort)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.addFlows(c.snatFlowCache, cacheKey, flows, false)
return c.addFlows(c.snatFlowCache, cacheKey, flows)
}

func (c *client) UninstallPodSNATFlows(ofPort uint32) error {
Expand Down Expand Up @@ -892,7 +902,7 @@ func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedO
flows = append(flows, c.traceflowConnectionTrackFlows(dataplaneTag, receiverOnly, packet, ofPort, timeoutSeconds, cookie.Default)...)
flows = append(flows, c.traceflowL2ForwardOutputFlows(dataplaneTag, liveTraffic, droppedOnly, timeoutSeconds, cookie.Default)...)
flows = append(flows, c.traceflowNetworkPolicyFlows(dataplaneTag, timeoutSeconds, cookie.Default)...)
return c.addFlows(c.tfFlowCache, cacheKey, flows, false)
return c.addFlows(c.tfFlowCache, cacheKey, flows)
}

func (c *client) UninstallTraceflowFlows(dataplaneTag uint8) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *client) InstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPor
var flows []binding.Flow
flows = append(flows, c.loadBalancerServiceFromOutsideFlow(svcIP, svcPort, protocol))
cacheKey := fmt.Sprintf("L%s%s%x", svcIP, protocol, svcPort)
return c.addFlows(c.serviceFlowCache, cacheKey, flows, false)
return c.addFlows(c.serviceFlowCache, cacheKey, flows)
}

func (c *client) UninstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
Expand Down
19 changes: 19 additions & 0 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package openflow
import (
"encoding/binary"
"fmt"
"k8s.io/klog"
"math"
"net"
"strings"
Expand Down Expand Up @@ -332,6 +333,7 @@ type OFEntryOperations interface {
Modify(flow binding.Flow) error
Delete(flow binding.Flow) error
AddAll(flows []binding.Flow) error
ModifyAll(flows []binding.Flow) error
DeleteAll(flows []binding.Flow) error
AddOFEntries(ofEntries []binding.OFEntry) error
DeleteOFEntries(ofEntries []binding.OFEntry) error
Expand Down Expand Up @@ -456,6 +458,23 @@ func (c *client) AddAll(flows []binding.Flow) error {
return nil
}

func (c *client) ModifyAll(flows []binding.Flow) error {
if len(flows) == 0 {
return nil
}
startTime := time.Now()
defer func() {
d := time.Since(startTime)
metrics.OVSFlowOpsLatency.WithLabelValues("modify").Observe(float64(d.Milliseconds()))
}()
if err := c.bridge.AddFlowsInBundle(nil, flows, nil); err != nil {
metrics.OVSFlowOpsErrorCount.WithLabelValues("modify").Inc()
return err
}
metrics.OVSFlowOpsCount.WithLabelValues("modify").Inc()
return nil
}

func (c *client) DeleteAll(flows []binding.Flow) error {
startTime := time.Now()
defer func() {
Expand Down

0 comments on commit 5e6f48d

Please sign in to comment.