Skip to content

Commit

Permalink
Skip remove if reassigns to the original node (milvus-io#17450)
Browse files Browse the repository at this point in the history
Fix ut race

See also: milvus-io#15966, milvus-io#17432

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Jun 10, 2022
1 parent 2cd8e15 commit 2b53405
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 100 deletions.
110 changes: 58 additions & 52 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,51 +634,6 @@ func (c *ChannelManager) processAck(e *ackEvent) {
}
}

// CleanupAndReassign tries to clean up datanode's subscription, and then delete channel watch info.
func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()

chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName)
if chToCleanUp == nil {
return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID)
}

if c.msgstreamFactory == nil {
log.Warn("msgstream factory is not set, unable to clean up topics")
} else {
subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, chToCleanUp.CollectionID)
pchannelName := funcutil.ToPhysicalChannel(channelName)
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}

err := c.remove(nodeID, chToCleanUp)
if err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}

if c.isMarkedDrop(channelName) {
log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)
return nil
}

reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}

// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, add channel to the original node",
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName))
updates.Add(nodeID, []*channel{chToCleanUp})
}

log.Info("channel manager reassign channels", zap.Int64("old nodeID", nodeID), zap.Array("updates", updates))

return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}

type channelStateChecker func(context.Context)

func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
Expand Down Expand Up @@ -777,33 +732,84 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error {
return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", nodeID, channelName)
}

if err := c.remove(nodeID, ch); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}
reallocates := &NodeChannelInfo{nodeID, []*channel{ch}}

if c.isMarkedDrop(channelName) {
if err := c.remove(nodeID, ch); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}

log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)

log.Info("removed channel assignment", zap.Any("channel", ch))
return nil
}

reallocates := &NodeChannelInfo{nodeID, []*channel{ch}}

// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
if len(updates) <= 0 { // skip the remove if reassign to the original node
log.Warn("fail to reassign channel to other nodes, assign to the original Node",
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName))
updates.Add(nodeID, []*channel{ch})
} else {
if err := c.remove(nodeID, ch); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}
}

log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates))

return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}

// CleanupAndReassign tries to clean up datanode's subscription, and then delete channel watch info.
func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()

chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName)
if chToCleanUp == nil {
return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID)
}

if c.msgstreamFactory == nil {
log.Warn("msgstream factory is not set, unable to clean up topics")
} else {
subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, chToCleanUp.CollectionID)
pchannelName := funcutil.ToPhysicalChannel(channelName)
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}

reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}

if c.isMarkedDrop(channelName) {
if err := c.remove(nodeID, chToCleanUp); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}

log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)

log.Info("removed channel assignment", zap.Any("channel", chToCleanUp))
return nil
}

// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 { // skip the remove if reassign to the original node
log.Warn("fail to reassign channel to other nodes, add channel to the original node",
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName))
updates.Add(nodeID, []*channel{chToCleanUp})
} else {
if err := c.remove(nodeID, chToCleanUp); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}
}

log.Info("channel manager reassign channels", zap.Int64("old nodeID", nodeID), zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}

func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName string) *channel {
Expand Down
64 changes: 16 additions & 48 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,17 +327,16 @@ func TestChannelManager(t *testing.T) {
err = chManager.AddNode(nodeToAdd)
assert.NoError(t, err)

chInfo := chManager.store.GetNode(nodeID)
assert.Equal(t, 2, len(chInfo.Channels))
chInfo = chManager.store.GetNode(nodeToAdd)
assert.Equal(t, 0, len(chInfo.Channels))
assert.True(t, chManager.Match(nodeID, channel1))
assert.True(t, chManager.Match(nodeID, channel2))
assert.False(t, chManager.Match(nodeToAdd, channel1))
assert.False(t, chManager.Match(nodeToAdd, channel2))

err = chManager.Watch(&channel{"channel-3", collectionID})
assert.NoError(t, err)

chInfo = chManager.store.GetNode(nodeToAdd)
assert.Equal(t, 1, len(chInfo.Channels))
chManager.stateTimer.removeTimers([]string{"channel-3"})
assert.True(t, chManager.Match(nodeToAdd, "channel-3"))

checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeToAdd, "channel-3", collectionID)
})
Expand Down Expand Up @@ -369,10 +368,8 @@ func TestChannelManager(t *testing.T) {
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel2)

chInfo := chManager.store.GetNode(nodeID)
assert.Equal(t, 2, len(chInfo.Channels))
chManager.Match(nodeID, channel1)
chManager.Match(nodeID, channel2)
assert.True(t, chManager.Match(nodeID, channel1))
assert.True(t, chManager.Match(nodeID, channel2))

err = chManager.Watch(&channel{"channel-3", collectionID})
assert.NoError(t, err)
Expand Down Expand Up @@ -464,18 +461,11 @@ func TestChannelManager(t *testing.T) {
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})

// test no nodes are removed from store
nodesID := chManager.store.GetNodes()
assert.Equal(t, 2, len(nodesID))

// test nodes of reassignTest contains no channel
nodeChanInfo := chManager.store.GetNode(reassignTest.nodeID)
assert.Equal(t, 0, len(nodeChanInfo.Channels))

// test all channels are assgined to node of remainTest
nodeChanInfo = chManager.store.GetNode(remainTest.nodeID)
assert.Equal(t, 2, len(nodeChanInfo.Channels))
assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels)
assert.False(t, chManager.Match(reassignTest.nodeID, reassignTest.chName))
assert.True(t, chManager.Match(remainTest.nodeID, reassignTest.chName))
assert.True(t, chManager.Match(remainTest.nodeID, remainTest.chName))

// Delete node of reassginTest and try to Reassign node in remainTest
err = chManager.DeleteNode(reassignTest.nodeID)
Expand Down Expand Up @@ -551,18 +541,12 @@ func TestChannelManager(t *testing.T) {
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})

// test no nodes are removed from store
nodesID := chManager.store.GetNodes()
assert.Equal(t, 2, len(nodesID))

// test nodes of reassignTest contains no channel
nodeChanInfo := chManager.store.GetNode(reassignTest.nodeID)
assert.Equal(t, 0, len(nodeChanInfo.Channels))
assert.False(t, chManager.Match(reassignTest.nodeID, reassignTest.chName))

// test all channels are assgined to node of remainTest
nodeChanInfo = chManager.store.GetNode(remainTest.nodeID)
assert.Equal(t, 2, len(nodeChanInfo.Channels))
assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels)
assert.True(t, chManager.Match(remainTest.nodeID, reassignTest.chName))
assert.True(t, chManager.Match(remainTest.nodeID, remainTest.chName))

// Delete node of reassginTest and try to CleanupAndReassign node in remainTest
err = chManager.DeleteNode(reassignTest.nodeID)
Expand Down Expand Up @@ -899,48 +883,32 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
waitAndStore(datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess, 1, channelBalanced)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, channelBalanced)

infos := chManager.store.GetNode(1)
assert.Equal(t, 2, len(infos.Channels))
assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))

infos = chManager.store.GetNode(2)
assert.Equal(t, 1, len(infos.Channels))
assert.True(t, chManager.Match(2, "channel-1"))

chManager.AddNode(3)
chManager.Watch(&channel{"channel-4", collectionID})
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 3, "channel-4")
infos = chManager.store.GetNode(1)
assert.Equal(t, 2, len(infos.Channels))

assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))

infos = chManager.store.GetNode(2)
assert.Equal(t, 1, len(infos.Channels))
assert.True(t, chManager.Match(2, "channel-1"))

infos = chManager.store.GetNode(3)
assert.Equal(t, 1, len(infos.Channels))
assert.True(t, chManager.Match(3, "channel-4"))

chManager.DeleteNode(3)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, "channel-4")
infos = chManager.store.GetNode(1)
assert.Equal(t, 2, len(infos.Channels))

assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))

infos = chManager.store.GetNode(2)
assert.Equal(t, 2, len(infos.Channels))
assert.True(t, chManager.Match(2, "channel-1"))
assert.True(t, chManager.Match(2, "channel-4"))

chManager.DeleteNode(2)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-4")
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-1")
infos = chManager.store.GetNode(1)
assert.Equal(t, 4, len(infos.Channels))

assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))
assert.True(t, chManager.Match(1, "channel-1"))
Expand Down

0 comments on commit 2b53405

Please sign in to comment.