From 2b5340581c27874be9c906302c71ca415aedd0b0 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 10 Jun 2022 10:54:07 +0800 Subject: [PATCH] Skip remove if reassigns to the original node (#17450) Fix ut race See also: #15966, #17432 Signed-off-by: yangxuan --- internal/datacoord/channel_manager.go | 110 +++++++++++---------- internal/datacoord/channel_manager_test.go | 64 +++--------- 2 files changed, 74 insertions(+), 100 deletions(-) diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index d26c70fd64c55..bc20db421bbe1 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -634,51 +634,6 @@ func (c *ChannelManager) processAck(e *ackEvent) { } } -// CleanupAndReassign tries to clean up datanode's subscription, and then delete channel watch info. -func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error { - c.mu.Lock() - defer c.mu.Unlock() - - chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName) - if chToCleanUp == nil { - return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID) - } - - if c.msgstreamFactory == nil { - log.Warn("msgstream factory is not set, unable to clean up topics") - } else { - subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, chToCleanUp.CollectionID) - pchannelName := funcutil.ToPhysicalChannel(channelName) - msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName}) - } - - err := c.remove(nodeID, chToCleanUp) - if err != nil { - return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) - } - - if c.isMarkedDrop(channelName) { - log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName)) - c.h.FinishDropChannel(channelName) - return nil - } - - reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}} - - // reassign policy won't choose the same Node for a ressignment of a channel - updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) - if len(updates) <= 0 { - log.Warn("fail to reassign channel to other nodes, add channel to the original node", - zap.Int64("nodeID", nodeID), - zap.String("channel name", channelName)) - updates.Add(nodeID, []*channel{chToCleanUp}) - } - - log.Info("channel manager reassign channels", zap.Int64("old nodeID", nodeID), zap.Array("updates", updates)) - - return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) -} - type channelStateChecker func(context.Context) func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) { @@ -777,11 +732,13 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error { return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", nodeID, channelName) } - if err := c.remove(nodeID, ch); err != nil { - return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) - } + reallocates := &NodeChannelInfo{nodeID, []*channel{ch}} if c.isMarkedDrop(channelName) { + if err := c.remove(nodeID, ch); err != nil { + return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) + } + log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName)) c.h.FinishDropChannel(channelName) @@ -789,21 +746,70 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error { return nil } - reallocates := &NodeChannelInfo{nodeID, []*channel{ch}} - // reassign policy won't choose the same Node for a ressignment of a channel updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) - if len(updates) <= 0 { + if len(updates) <= 0 { // skip the remove if reassign to the original node log.Warn("fail to reassign channel to other nodes, assign to the original Node", zap.Int64("nodeID", nodeID), zap.String("channel name", channelName)) updates.Add(nodeID, []*channel{ch}) + } else { + if err := c.remove(nodeID, ch); err != nil { + return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) + } } log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates)) - return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) +} + +// CleanupAndReassign tries to clean up datanode's subscription, and then delete channel watch info. +func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error { + c.mu.Lock() + defer c.mu.Unlock() + + chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName) + if chToCleanUp == nil { + return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID) + } + + if c.msgstreamFactory == nil { + log.Warn("msgstream factory is not set, unable to clean up topics") + } else { + subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, chToCleanUp.CollectionID) + pchannelName := funcutil.ToPhysicalChannel(channelName) + msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName}) + } + reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}} + + if c.isMarkedDrop(channelName) { + if err := c.remove(nodeID, chToCleanUp); err != nil { + return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) + } + + log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName)) + c.h.FinishDropChannel(channelName) + + log.Info("removed channel assignment", zap.Any("channel", chToCleanUp)) + return nil + } + + // reassign policy won't choose the same Node for a ressignment of a channel + updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) + if len(updates) <= 0 { // skip the remove if reassign to the original node + log.Warn("fail to reassign channel to other nodes, add channel to the original node", + zap.Int64("nodeID", nodeID), + zap.String("channel name", channelName)) + updates.Add(nodeID, []*channel{chToCleanUp}) + } else { + if err := c.remove(nodeID, chToCleanUp); err != nil { + return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) + } + } + + log.Info("channel manager reassign channels", zap.Int64("old nodeID", nodeID), zap.Array("updates", updates)) + return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) } func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName string) *channel { diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 6c16e08436a82..0e6d4b2ceaa9c 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -327,17 +327,16 @@ func TestChannelManager(t *testing.T) { err = chManager.AddNode(nodeToAdd) assert.NoError(t, err) - chInfo := chManager.store.GetNode(nodeID) - assert.Equal(t, 2, len(chInfo.Channels)) - chInfo = chManager.store.GetNode(nodeToAdd) - assert.Equal(t, 0, len(chInfo.Channels)) + assert.True(t, chManager.Match(nodeID, channel1)) + assert.True(t, chManager.Match(nodeID, channel2)) + assert.False(t, chManager.Match(nodeToAdd, channel1)) + assert.False(t, chManager.Match(nodeToAdd, channel2)) err = chManager.Watch(&channel{"channel-3", collectionID}) assert.NoError(t, err) - chInfo = chManager.store.GetNode(nodeToAdd) - assert.Equal(t, 1, len(chInfo.Channels)) chManager.stateTimer.removeTimers([]string{"channel-3"}) + assert.True(t, chManager.Match(nodeToAdd, "channel-3")) checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeToAdd, "channel-3", collectionID) }) @@ -369,10 +368,8 @@ func TestChannelManager(t *testing.T) { waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1) waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel2) - chInfo := chManager.store.GetNode(nodeID) - assert.Equal(t, 2, len(chInfo.Channels)) - chManager.Match(nodeID, channel1) - chManager.Match(nodeID, channel2) + assert.True(t, chManager.Match(nodeID, channel1)) + assert.True(t, chManager.Match(nodeID, channel2)) err = chManager.Watch(&channel{"channel-3", collectionID}) assert.NoError(t, err) @@ -464,18 +461,11 @@ func TestChannelManager(t *testing.T) { assert.NoError(t, err) chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) - // test no nodes are removed from store - nodesID := chManager.store.GetNodes() - assert.Equal(t, 2, len(nodesID)) - // test nodes of reassignTest contains no channel - nodeChanInfo := chManager.store.GetNode(reassignTest.nodeID) - assert.Equal(t, 0, len(nodeChanInfo.Channels)) - // test all channels are assgined to node of remainTest - nodeChanInfo = chManager.store.GetNode(remainTest.nodeID) - assert.Equal(t, 2, len(nodeChanInfo.Channels)) - assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels) + assert.False(t, chManager.Match(reassignTest.nodeID, reassignTest.chName)) + assert.True(t, chManager.Match(remainTest.nodeID, reassignTest.chName)) + assert.True(t, chManager.Match(remainTest.nodeID, remainTest.chName)) // Delete node of reassginTest and try to Reassign node in remainTest err = chManager.DeleteNode(reassignTest.nodeID) @@ -551,18 +541,12 @@ func TestChannelManager(t *testing.T) { assert.NoError(t, err) chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) - // test no nodes are removed from store - nodesID := chManager.store.GetNodes() - assert.Equal(t, 2, len(nodesID)) - // test nodes of reassignTest contains no channel - nodeChanInfo := chManager.store.GetNode(reassignTest.nodeID) - assert.Equal(t, 0, len(nodeChanInfo.Channels)) + assert.False(t, chManager.Match(reassignTest.nodeID, reassignTest.chName)) // test all channels are assgined to node of remainTest - nodeChanInfo = chManager.store.GetNode(remainTest.nodeID) - assert.Equal(t, 2, len(nodeChanInfo.Channels)) - assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels) + assert.True(t, chManager.Match(remainTest.nodeID, reassignTest.chName)) + assert.True(t, chManager.Match(remainTest.nodeID, remainTest.chName)) // Delete node of reassginTest and try to CleanupAndReassign node in remainTest err = chManager.DeleteNode(reassignTest.nodeID) @@ -899,48 +883,32 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { waitAndStore(datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess, 1, channelBalanced) waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, channelBalanced) - infos := chManager.store.GetNode(1) - assert.Equal(t, 2, len(infos.Channels)) assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-3")) - infos = chManager.store.GetNode(2) - assert.Equal(t, 1, len(infos.Channels)) assert.True(t, chManager.Match(2, "channel-1")) chManager.AddNode(3) chManager.Watch(&channel{"channel-4", collectionID}) waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 3, "channel-4") - infos = chManager.store.GetNode(1) - assert.Equal(t, 2, len(infos.Channels)) + assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-3")) - - infos = chManager.store.GetNode(2) - assert.Equal(t, 1, len(infos.Channels)) assert.True(t, chManager.Match(2, "channel-1")) - - infos = chManager.store.GetNode(3) - assert.Equal(t, 1, len(infos.Channels)) assert.True(t, chManager.Match(3, "channel-4")) chManager.DeleteNode(3) waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, "channel-4") - infos = chManager.store.GetNode(1) - assert.Equal(t, 2, len(infos.Channels)) + assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-3")) - - infos = chManager.store.GetNode(2) - assert.Equal(t, 2, len(infos.Channels)) assert.True(t, chManager.Match(2, "channel-1")) assert.True(t, chManager.Match(2, "channel-4")) chManager.DeleteNode(2) waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-4") waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-1") - infos = chManager.store.GetNode(1) - assert.Equal(t, 4, len(infos.Channels)) + assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-3")) assert.True(t, chManager.Match(1, "channel-1"))