Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Multicast] Clone remoteMembers when creating an updated GroupMemberStatus #4903

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent
newStatus := &GroupMemberStatus{
group: status.group,
localMembers: make(map[string]time.Time),
remoteMembers: status.remoteMembers,
remoteMembers: status.remoteMembers.Union(nil),
lastIGMPReport: status.lastIGMPReport,
ofGroupID: status.ofGroupID,
}
Expand Down Expand Up @@ -559,10 +559,7 @@ func (c *Controller) syncGroup(groupKey string) error {
func (c *Controller) groupIsStale(status *GroupMemberStatus) bool {
membersCount := len(status.localMembers)
diff := time.Now().Sub(status.lastIGMPReport)
if membersCount == 0 || diff > c.mcastGroupTimeout {
return true
}
return false
return membersCount == 0 || diff > c.mcastGroupTimeout
}

func (c *Controller) groupHasInstalled(groupKey string) bool {
Expand Down
87 changes: 87 additions & 0 deletions pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package multicast
import (
"context"
"fmt"
"math/rand"
"net"
"os"
"sync"
Expand Down Expand Up @@ -1031,6 +1032,92 @@ func TestMemberChanged(t *testing.T) {
}
}

func TestConcurrentEventHandlerAndWorkers(t *testing.T) {
c := newMockMulticastController(t, true)
c.ifaceStore = interfacestore.NewInterfaceStore()
stopCh := make(chan struct{})
defer close(stopCh)
groupIP := net.ParseIP("224.3.4.5")
numEvents := 10
var wg sync.WaitGroup
wg.Add(4)

eventFunc := func(eType eventType, isLocal bool) {
leastSignificantByteArr := rand.Perm(numEvents)
ifaceNamePrefix := "local-interfaceName"
ifaceType := interfacestore.ContainerInterface

if !isLocal {
ifaceNamePrefix = "remote-interfaceName"
ifaceType = interfacestore.TunnelInterface
}
for i := 0; i < len(leastSignificantByteArr); i++ {
var srcNode net.IP
var containerCfg *interfacestore.ContainerInterfaceConfig
if !isLocal {
srcNode = net.ParseIP(fmt.Sprintf("10.20.30.%d", leastSignificantByteArr[i]+2))
} else {
containerCfg = &interfacestore.ContainerInterfaceConfig{
ContainerID: fmt.Sprintf("container-%d", i),
}
}
iface := &interfacestore.InterfaceConfig{
Type: ifaceType,
InterfaceName: fmt.Sprintf("%s-%d", ifaceNamePrefix, i),
OVSPortConfig: &interfacestore.OVSPortConfig{
OFPort: int32(i),
},
ContainerInterfaceConfig: containerCfg,
}
if eType == groupJoin {
c.ifaceStore.AddInterface(iface)
}
c.groupEventCh <- &mcastGroupEvent{
group: groupIP,
eType: eType,
time: time.Now(),
iface: iface,
srcNode: srcNode,
}
}
}
// Below func adds local group join events.
go func() {
defer wg.Done()
eventFunc(groupJoin, true)
}()
// Below func adds local group leave events.
go func() {
defer wg.Done()
eventFunc(groupLeave, true)
}()
// Below func adds remote group join events.
go func() {
defer wg.Done()
eventFunc(groupJoin, false)
}()
// Below func adds remote group leave events.
go func() {
defer wg.Done()
eventFunc(groupLeave, false)
}()

mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockOFClient.EXPECT().InstallMulticastFlows(groupIP, gomock.Any()).AnyTimes()
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any()).AnyTimes()
mockOFClient.EXPECT().UninstallMulticastFlows(groupIP).AnyTimes()
mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockOFClient.EXPECT().SendIGMPQueryPacketOut(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
go c.eventHandler(stopCh)
for i := 0; i < 2; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
wg.Wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no guaratee that when this call finishes, eventHander and worker have processed the events. Could we use some determinstic input and decide when the goroutines finish their job by polling the result, which could be more determinstic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check on the size of eventCh and queue after all events have been sent.

assert.Eventually(t, func() bool {
return len(c.groupEventCh) == 0 && c.queue.Len() == 0
}, time.Second, time.Millisecond*100)
}

func TestRemoteMemberJoinLeave(t *testing.T) {
mockController := newMockMulticastController(t, true)
_ = mockController.initialize(t)
Expand Down