diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index a1df1200eb7..0800ca78649 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -131,7 +131,7 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent newStatus := &GroupMemberStatus{ group: status.group, localMembers: make(map[string]time.Time), - remoteMembers: status.remoteMembers, + remoteMembers: status.remoteMembers.Union(nil), lastIGMPReport: status.lastIGMPReport, ofGroupID: status.ofGroupID, } @@ -559,10 +559,7 @@ func (c *Controller) syncGroup(groupKey string) error { func (c *Controller) groupIsStale(status *GroupMemberStatus) bool { membersCount := len(status.localMembers) diff := time.Now().Sub(status.lastIGMPReport) - if membersCount == 0 || diff > c.mcastGroupTimeout { - return true - } - return false + return membersCount == 0 || diff > c.mcastGroupTimeout } func (c *Controller) groupHasInstalled(groupKey string) bool { diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index a2d7b1b032f..6d61dfcb303 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -17,6 +17,7 @@ package multicast import ( "context" "fmt" + "math/rand" "net" "os" "sync" @@ -1031,6 +1032,92 @@ func TestMemberChanged(t *testing.T) { } } +func TestConcurrentEventHandlerAndWorkers(t *testing.T) { + c := newMockMulticastController(t, true) + c.ifaceStore = interfacestore.NewInterfaceStore() + stopCh := make(chan struct{}) + defer close(stopCh) + groupIP := net.ParseIP("224.3.4.5") + numEvents := 10 + var wg sync.WaitGroup + wg.Add(4) + + eventFunc := func(eType eventType, isLocal bool) { + leastSignificantByteArr := rand.Perm(numEvents) + ifaceNamePrefix := "local-interfaceName" + ifaceType := interfacestore.ContainerInterface + + if !isLocal { + ifaceNamePrefix = "remote-interfaceName" + ifaceType = interfacestore.TunnelInterface + } + for i := 0; i < len(leastSignificantByteArr); i++ { + var srcNode net.IP + var containerCfg *interfacestore.ContainerInterfaceConfig + if !isLocal { + srcNode = net.ParseIP(fmt.Sprintf("10.20.30.%d", leastSignificantByteArr[i]+2)) + } else { + containerCfg = &interfacestore.ContainerInterfaceConfig{ + ContainerID: fmt.Sprintf("container-%d", i), + } + } + iface := &interfacestore.InterfaceConfig{ + Type: ifaceType, + InterfaceName: fmt.Sprintf("%s-%d", ifaceNamePrefix, i), + OVSPortConfig: &interfacestore.OVSPortConfig{ + OFPort: int32(i), + }, + ContainerInterfaceConfig: containerCfg, + } + if eType == groupJoin { + c.ifaceStore.AddInterface(iface) + } + c.groupEventCh <- &mcastGroupEvent{ + group: groupIP, + eType: eType, + time: time.Now(), + iface: iface, + srcNode: srcNode, + } + } + } + // Below func adds local group join events. + go func() { + defer wg.Done() + eventFunc(groupJoin, true) + }() + // Below func adds local group leave events. + go func() { + defer wg.Done() + eventFunc(groupLeave, true) + }() + // Below func adds remote group join events. + go func() { + defer wg.Done() + eventFunc(groupJoin, false) + }() + // Below func adds remote group leave events. + go func() { + defer wg.Done() + eventFunc(groupLeave, false) + }() + + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockOFClient.EXPECT().InstallMulticastFlows(groupIP, gomock.Any()).AnyTimes() + mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any()).AnyTimes() + mockOFClient.EXPECT().UninstallMulticastFlows(groupIP).AnyTimes() + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockOFClient.EXPECT().SendIGMPQueryPacketOut(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + go c.eventHandler(stopCh) + for i := 0; i < 2; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + wg.Wait() + assert.Eventually(t, func() bool { + return len(c.groupEventCh) == 0 && c.queue.Len() == 0 + }, time.Second, time.Millisecond*100) +} + func TestRemoteMemberJoinLeave(t *testing.T) { mockController := newMockMulticastController(t, true) _ = mockController.initialize(t)