Skip to content

Commit

Permalink
Fix flows not updated bug
Browse files Browse the repository at this point in the history
  • Loading branch information
lzhecheng committed May 11, 2021
1 parent 69d8745 commit 42ed700
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
}

nrInfo, installed, _ := c.installedNodes.GetByKey(nodeName)
if installed && nrInfo.(*nodeRouteInfo).nodeMAC.String() == peerNodeMAC.String() {
if installed && nrInfo != nil && nrInfo.(*nodeRouteInfo).nodeMAC != nil && nrInfo.(*nodeRouteInfo).nodeMAC.String() == peerNodeMAC.String() {
// Route is already added for this Node and Node MAC isn't changed.
return nil
}
Expand Down
40 changes: 25 additions & 15 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,22 @@ func (c *client) IsConnected() bool {

// addFlows installs the flows on the OVS bridge and then add them into the flow cache. If the flow cache exists,
// it will return immediately, otherwise it will use Bundle to add all flows, and then add them into the flow cache.
// If it fails to add the flows with Bundle, it will return the error and no flow cache is created.
func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error {
_, ok := cache.Load(flowCacheKey)
// If a flow cache entry already exists for the key, return immediately. Otherwise, add the flows to the switch
// and populate the cache with them.
if ok {
klog.V(2).Infof("Flows with cache key %s are already installed", flowCacheKey)
return nil
// If it fails to add the flows with Bundle, it will return the error and no flow cache is created. If the force parameter
// is true, flows will be added regardless of flow cache.
func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow, force bool) error {
if !force {
_, ok := cache.Load(flowCacheKey)
// If a flow cache entry already exists for the key, return immediately. Otherwise, add the flows to the switch
// and populate the cache with them.
if ok {
klog.Infof("Flows with cache key %s are already installed", flowCacheKey)
return nil
}
} else {
err := c.ofEntryOperations.DeleteAll(flows)
if err != nil {
return err
}
}
err := c.ofEntryOperations.AddAll(flows)
if err != nil {
Expand Down Expand Up @@ -345,6 +353,7 @@ func (c *client) InstallNodeFlows(hostname string,
var flows []binding.Flow
localGatewayMAC := c.nodeConfig.GatewayConfig.MAC

forceAdd := false
for peerPodCIDR, peerGatewayIP := range peerConfigs {
if peerGatewayIP.To4() != nil {
// Since broadcast is not supported in IPv6, ARP should happen only with IPv4 address, and ARP responder flows
Expand All @@ -357,6 +366,7 @@ func (c *client) InstallNodeFlows(hostname string,
flows = append(flows, c.l3FwdFlowToRemote(localGatewayMAC, *peerPodCIDR, tunnelPeerIP, cookie.Node))
} else if runtime.IsWindowsPlatform() && !c.encapMode.NeedsRoutingToPeer(tunnelPeerIP, c.nodeConfig.NodeIPAddr) && remoteGatewayMAC != nil {
flows = append(flows, c.l3FwdFlowToRemoteViaRouting(remoteGatewayMAC, *peerPodCIDR, cookie.Node)...)
forceAdd = true
} else {
flows = append(flows, c.l3FwdFlowToRemoteViaGW(localGatewayMAC, *peerPodCIDR, cookie.Node))
}
Expand All @@ -369,7 +379,7 @@ func (c *client) InstallNodeFlows(hostname string,
flows = append(flows, c.tunnelClassifierFlow(ipsecTunOFPort, cookie.Node))
}

return c.addFlows(c.nodeFlowCache, hostname, flows)
return c.addFlows(c.nodeFlowCache, hostname, flows, forceAdd)
}

func (c *client) UninstallNodeFlows(hostname string) error {
Expand Down Expand Up @@ -404,7 +414,7 @@ func (c *client) InstallPodFlows(interfaceName string, podInterfaceIPs []net.IP,
c.l3FwdFlowRouteToPod(podInterfaceIPs, podInterfaceMAC, cookie.Pod)...,
)
}
return c.addFlows(c.podFlowCache, interfaceName, flows)
return c.addFlows(c.podFlowCache, interfaceName, flows, false)
}

func (c *client) UninstallPodFlows(interfaceName string) error {
Expand Down Expand Up @@ -480,7 +490,7 @@ func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []pro
if endpoint.GetIsLocal() {
flows = append(flows, c.hairpinSNATFlow(endpointIP))
}
if err := c.addFlows(c.serviceFlowCache, cacheKey, flows); err != nil {
if err := c.addFlows(c.serviceFlowCache, cacheKey, flows, false); err != nil {
return err
}
}
Expand Down Expand Up @@ -508,7 +518,7 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP,
flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout))
}
cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol)
return c.addFlows(c.serviceFlowCache, cacheKey, flows)
return c.addFlows(c.serviceFlowCache, cacheKey, flows, false)
}

func (c *client) UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
Expand Down Expand Up @@ -701,7 +711,7 @@ func (c *client) InstallSNATMarkFlows(snatIP net.IP, mark uint32) error {
cacheKey := fmt.Sprintf("s%x", mark)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.addFlows(c.snatFlowCache, cacheKey, flows)
return c.addFlows(c.snatFlowCache, cacheKey, flows, false)
}

func (c *client) UninstallSNATMarkFlows(mark uint32) error {
Expand All @@ -716,7 +726,7 @@ func (c *client) InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint
cacheKey := fmt.Sprintf("p%x", ofPort)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.addFlows(c.snatFlowCache, cacheKey, flows)
return c.addFlows(c.snatFlowCache, cacheKey, flows, false)
}

func (c *client) UninstallPodSNATFlows(ofPort uint32) error {
Expand Down Expand Up @@ -882,7 +892,7 @@ func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedO
flows = append(flows, c.traceflowConnectionTrackFlows(dataplaneTag, receiverOnly, packet, ofPort, timeoutSeconds, cookie.Default)...)
flows = append(flows, c.traceflowL2ForwardOutputFlows(dataplaneTag, liveTraffic, droppedOnly, timeoutSeconds, cookie.Default)...)
flows = append(flows, c.traceflowNetworkPolicyFlows(dataplaneTag, timeoutSeconds, cookie.Default)...)
return c.addFlows(c.tfFlowCache, cacheKey, flows)
return c.addFlows(c.tfFlowCache, cacheKey, flows, false)
}

func (c *client) UninstallTraceflowFlows(dataplaneTag uint8) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *client) InstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPor
var flows []binding.Flow
flows = append(flows, c.loadBalancerServiceFromOutsideFlow(svcIP, svcPort, protocol))
cacheKey := fmt.Sprintf("L%s%s%x", svcIP, protocol, svcPort)
return c.addFlows(c.serviceFlowCache, cacheKey, flows)
return c.addFlows(c.serviceFlowCache, cacheKey, flows, false)
}

func (c *client) UninstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
Expand Down

0 comments on commit 42ed700

Please sign in to comment.