diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index d65585e68f0..f9f657feae0 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -27,7 +27,7 @@ import ( "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" binding "antrea.io/antrea/pkg/ovs/openflow" - "antrea.io/antrea/antrea/pkg/util/runtime" + "antrea.io/antrea/pkg/util/runtime" "antrea.io/antrea/third_party/proxy" ) @@ -294,20 +294,13 @@ func (c *client) IsConnected() bool { // it will return immediately, otherwise it will use Bundle to add all flows, and then add them into the flow cache. // If it fails to add the flows with Bundle, it will return the error and no flow cache is created. If the force parameter // is true, flows will be added regardless of flow cache. -func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow, force bool) error { - if !force { - _, ok := cache.Load(flowCacheKey) - // If a flow cache entry already exists for the key, return immediately. Otherwise, add the flows to the switch - // and populate the cache with them. - if ok { - klog.Infof("Flows with cache key %s are already installed", flowCacheKey) - return nil - } - } else { - err := c.ofEntryOperations.DeleteAll(flows) - if err != nil { - return err - } +func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error { + _, ok := cache.Load(flowCacheKey) + // If a flow cache entry already exists for the key, return immediately. Otherwise, add the flows to the switch + // and populate the cache with them. + if ok { + klog.V(2).Infof("Flows with cache key %s are already installed", flowCacheKey) + return nil } err := c.ofEntryOperations.AddAll(flows) if err != nil { @@ -322,6 +315,20 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [ return nil } +func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error { + err := c.ofEntryOperations.ModifyAll(flows) + if err != nil { + return err + } + fCache := flowCache{} + // Modify the flows in the flow cache. + for _, flow := range flows { + fCache[flow.MatchString()] = flow + } + cache.Store(flowCacheKey, fCache) + return nil +} + // deleteFlows deletes all the flows in the flow cache indexed by the provided flowCacheKey. func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) error { fCacheI, ok := cache.Load(flowCacheKey) @@ -353,7 +360,7 @@ func (c *client) InstallNodeFlows(hostname string, var flows []binding.Flow localGatewayMAC := c.nodeConfig.GatewayConfig.MAC - forceAdd := false + modify := false for peerPodCIDR, peerGatewayIP := range peerConfigs { if peerGatewayIP.To4() != nil { // Since broadcast is not supported in IPv6, ARP should happen only with IPv4 address, and ARP responder flows @@ -366,7 +373,7 @@ func (c *client) InstallNodeFlows(hostname string, flows = append(flows, c.l3FwdFlowToRemote(localGatewayMAC, *peerPodCIDR, tunnelPeerIP, cookie.Node)) } else if runtime.IsWindowsPlatform() && !c.encapMode.NeedsRoutingToPeer(tunnelPeerIP, c.nodeConfig.NodeIPAddr) && remoteGatewayMAC != nil { flows = append(flows, c.l3FwdFlowToRemoteViaRouting(remoteGatewayMAC, *peerPodCIDR, cookie.Node)...) - forceAdd = true + modify = true } else { flows = append(flows, c.l3FwdFlowToRemoteViaGW(localGatewayMAC, *peerPodCIDR, cookie.Node)) } @@ -379,7 +386,10 @@ func (c *client) InstallNodeFlows(hostname string, flows = append(flows, c.tunnelClassifierFlow(ipsecTunOFPort, cookie.Node)) } - return c.addFlows(c.nodeFlowCache, hostname, flows, forceAdd) + if modify { + return c.modifyFlows(c.nodeFlowCache, hostname, flows) + } + return c.addFlows(c.nodeFlowCache, hostname, flows) } func (c *client) UninstallNodeFlows(hostname string) error { @@ -414,7 +424,7 @@ func (c *client) InstallPodFlows(interfaceName string, podInterfaceIPs []net.IP, c.l3FwdFlowRouteToPod(podInterfaceIPs, podInterfaceMAC, cookie.Pod)..., ) } - return c.addFlows(c.podFlowCache, interfaceName, flows, false) + return c.addFlows(c.podFlowCache, interfaceName, flows) } func (c *client) UninstallPodFlows(interfaceName string) error { @@ -490,7 +500,7 @@ func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []pro if endpoint.GetIsLocal() { flows = append(flows, c.hairpinSNATFlow(endpointIP)) } - if err := c.addFlows(c.serviceFlowCache, cacheKey, flows, false); err != nil { + if err := c.addFlows(c.serviceFlowCache, cacheKey, flows); err != nil { return err } } @@ -518,7 +528,7 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout)) } cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol) - return c.addFlows(c.serviceFlowCache, cacheKey, flows, false) + return c.addFlows(c.serviceFlowCache, cacheKey, flows) } func (c *client) UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error { @@ -711,7 +721,7 @@ func (c *client) InstallSNATMarkFlows(snatIP net.IP, mark uint32) error { cacheKey := fmt.Sprintf("s%x", mark) c.replayMutex.RLock() defer c.replayMutex.RUnlock() - return c.addFlows(c.snatFlowCache, cacheKey, flows, false) + return c.addFlows(c.snatFlowCache, cacheKey, flows) } func (c *client) UninstallSNATMarkFlows(mark uint32) error { @@ -726,7 +736,7 @@ func (c *client) InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint cacheKey := fmt.Sprintf("p%x", ofPort) c.replayMutex.RLock() defer c.replayMutex.RUnlock() - return c.addFlows(c.snatFlowCache, cacheKey, flows, false) + return c.addFlows(c.snatFlowCache, cacheKey, flows) } func (c *client) UninstallPodSNATFlows(ofPort uint32) error { @@ -892,7 +902,7 @@ func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedO flows = append(flows, c.traceflowConnectionTrackFlows(dataplaneTag, receiverOnly, packet, ofPort, timeoutSeconds, cookie.Default)...) flows = append(flows, c.traceflowL2ForwardOutputFlows(dataplaneTag, liveTraffic, droppedOnly, timeoutSeconds, cookie.Default)...) flows = append(flows, c.traceflowNetworkPolicyFlows(dataplaneTag, timeoutSeconds, cookie.Default)...) - return c.addFlows(c.tfFlowCache, cacheKey, flows, false) + return c.addFlows(c.tfFlowCache, cacheKey, flows) } func (c *client) UninstallTraceflowFlows(dataplaneTag uint8) error { diff --git a/pkg/agent/openflow/client_windows.go b/pkg/agent/openflow/client_windows.go index 091c18a0c3d..9e6b76b512f 100644 --- a/pkg/agent/openflow/client_windows.go +++ b/pkg/agent/openflow/client_windows.go @@ -40,7 +40,7 @@ func (c *client) InstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPor var flows []binding.Flow flows = append(flows, c.loadBalancerServiceFromOutsideFlow(svcIP, svcPort, protocol)) cacheKey := fmt.Sprintf("L%s%s%x", svcIP, protocol, svcPort) - return c.addFlows(c.serviceFlowCache, cacheKey, flows, false) + return c.addFlows(c.serviceFlowCache, cacheKey, flows) } func (c *client) UninstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error { diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 62198418734..482f97e99bf 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -17,6 +17,7 @@ package openflow import ( "encoding/binary" "fmt" + "k8s.io/klog" "math" "net" "strings" @@ -332,6 +333,7 @@ type OFEntryOperations interface { Modify(flow binding.Flow) error Delete(flow binding.Flow) error AddAll(flows []binding.Flow) error + ModifyAll(flows []binding.Flow) error DeleteAll(flows []binding.Flow) error AddOFEntries(ofEntries []binding.OFEntry) error DeleteOFEntries(ofEntries []binding.OFEntry) error @@ -456,6 +458,23 @@ func (c *client) AddAll(flows []binding.Flow) error { return nil } +func (c *client) ModifyAll(flows []binding.Flow) error { + if len(flows) == 0 { + return nil + } + startTime := time.Now() + defer func() { + d := time.Since(startTime) + metrics.OVSFlowOpsLatency.WithLabelValues("modify").Observe(float64(d.Milliseconds())) + }() + if err := c.bridge.AddFlowsInBundle(nil, flows, nil); err != nil { + metrics.OVSFlowOpsErrorCount.WithLabelValues("modify").Inc() + return err + } + metrics.OVSFlowOpsCount.WithLabelValues("modify").Inc() + return nil +} + func (c *client) DeleteAll(flows []binding.Flow) error { startTime := time.Now() defer func() {