Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make DataNode release rather than delete when reassign #17293

Merged
merged 1 commit into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 76 additions & 56 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,17 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
// ReleaseSuccess remove
// ReleaseFail clean up and remove
func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
// Load all the watch infos before processing
nodeWatchInfos := make(map[UniqueID][]*datapb.ChannelWatchInfo)
for _, nodeID := range nodes {
watchInfos, err := c.stateTimer.loadAllChannels(nodeID)
if err != nil {
return err
}
nodeWatchInfos[nodeID] = watchInfos
}

for nodeID, watchInfos := range nodeWatchInfos {
for _, info := range watchInfos {
channelName := info.GetVchan().GetChannelName()

Expand All @@ -198,12 +203,12 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, info.GetTimeoutTs())

case datapb.ChannelWatchState_ReleaseSuccess:
if err := c.toDelete(nodeID, channelName); err != nil {
if err := c.Reassign(nodeID, channelName); err != nil {
return err
}

case datapb.ChannelWatchState_ReleaseFailure:
if err := c.cleanUpAndDelete(nodeID, channelName); err != nil {
if err := c.CleanupAndReassign(nodeID, channelName); err != nil {
return err
}
}
Expand Down Expand Up @@ -318,17 +323,17 @@ func (c *ChannelManager) AddNode(nodeID int64) error {

c.store.Add(nodeID)

// the default registerPolicy doesn't reassgin channels already there
updates := c.registerPolicy(c.store, nodeID)
if len(updates) <= 0 {
log.Info("register node with no reassignment", zap.Int64("registered node", nodeID))
return nil
}

log.Info("register node",
zap.Int64("registered node", nodeID),
zap.Array("updates", updates))

return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToRelease)
XuanYang-cn marked this conversation as resolved.
Show resolved Hide resolved
XuanYang-cn marked this conversation as resolved.
Show resolved Hide resolved
}

// DeleteNode deletes the node from the cluster.
Expand All @@ -348,10 +353,30 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
log.Warn("deregister node",
zap.Int64("unregistered node", nodeID),
zap.Array("updates", updates))
if len(updates) <= 0 {
return nil
}

var channels []*channel
for _, op := range updates {
if op.Type == Delete {
channels = op.Channels
}
}

chNames := make([]string, 0, len(channels))
for _, ch := range channels {
chNames = append(chNames, ch.Name)
}
log.Debug("remove timers for channel of the deregistered node",
zap.Any("channels", chNames), zap.Int64("nodeID", nodeID))
c.stateTimer.removeTimers(chNames)

if err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch); err != nil {
return err
}

// No channels will be return
_, err := c.store.Delete(nodeID)
return err
}
Expand Down Expand Up @@ -579,23 +604,25 @@ func (c *ChannelManager) processAck(e *ackEvent) {
}

case releaseFailAck, releaseTimeoutAck: // failure acks from toRelease
err := c.cleanUpAndDelete(e.nodeID, e.channelName)
// Cleanup, Delete and Reassign
err := c.CleanupAndReassign(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to clean and delete channels for release failure ACKs",
log.Warn("fail to clean and reassign channels for release failure ACKs",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
}

case releaseSuccessAck:
err := c.toDelete(e.nodeID, e.channelName)
// Delete and Reassign
err := c.Reassign(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to response to release success ACK",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
}
}
}

// cleanUpAndDelete tries to clean up datanode's subscription, and then delete channel watch info.
func (c *ChannelManager) cleanUpAndDelete(nodeID UniqueID, channelName string) error {
// CleanupAndReassign tries to clean up datanode's subscription, and then delete channel watch info.
func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -612,35 +639,31 @@ func (c *ChannelManager) cleanUpAndDelete(nodeID UniqueID, channelName string) e
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}

if !c.isMarkedDrop(channelName) {
reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}

// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, add channel to buffer", zap.String("channel name", channelName))
updates.Add(bufferID, []*channel{chToCleanUp})
}

err := c.remove(nodeID, chToCleanUp)
if err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}

log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates))

return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}

err := c.remove(nodeID, chToCleanUp)
if err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}

log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)
if c.isMarkedDrop(channelName) {
log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)
return nil
}

return nil
reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}

// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, add channel to the original node",
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName))
updates.Add(nodeID, []*channel{chToCleanUp})
}

log.Info("channel manager reassign channels", zap.Int64("old nodeID", nodeID), zap.Array("updates", updates))

return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}

type channelStateChecker func(context.Context)
Expand Down Expand Up @@ -724,8 +747,8 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error {
return err
}

// toDelete removes channel assignment from a datanode
func (c *ChannelManager) toDelete(nodeID UniqueID, channelName string) error {
// Reassign removes channel assignment from a datanode
func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -734,36 +757,33 @@ func (c *ChannelManager) toDelete(nodeID UniqueID, channelName string) error {
return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", nodeID, channelName)
}

if !c.isMarkedDrop(channelName) {
reallocates := &NodeChannelInfo{nodeID, []*channel{ch}}
if err := c.remove(nodeID, ch); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}

// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, add to the buffer", zap.String("channel name", channelName))
updates.Add(bufferID, []*channel{ch})
}
if c.isMarkedDrop(channelName) {
log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)

err := c.remove(nodeID, ch)
if err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}
log.Info("removed channel assignment", zap.Any("channel", ch))
return nil
}

log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates))
reallocates := &NodeChannelInfo{nodeID, []*channel{ch}}

return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, assign to the original Node",
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName))
updates.Add(nodeID, []*channel{ch})
}

err := c.remove(nodeID, ch)
if err != nil {
return err
}
log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates))

log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)

log.Info("removed channel assignment", zap.Any("channel", ch))
return nil
}

func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName string) *channel {
Expand Down
Loading