Skip to content

Commit

Permalink
fix missing handling channels while a node down (milvus-io#18250)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
  • Loading branch information
xiaofan-luan authored Jul 18, 2022
1 parent c910905 commit c975635
Show file tree
Hide file tree
Showing 15 changed files with 487 additions and 472 deletions.
179 changes: 101 additions & 78 deletions internal/querycoord/channel_unsubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package querycoord

import (
"container/list"
"context"
"fmt"
"sync"
"time"

"github.com/golang/protobuf/proto"
"go.uber.org/zap"
Expand All @@ -33,35 +33,32 @@ import (
"github.com/milvus-io/milvus/internal/util/funcutil"
)

const (
unsubscribeChannelInfoPrefix = "queryCoord-unsubscribeChannelInfo"
)

type channelUnsubscribeHandler struct {
type ChannelCleaner struct {
ctx context.Context
cancel context.CancelFunc
kvClient *etcdkv.EtcdKV
factory msgstream.Factory

mut sync.RWMutex // mutex for channelInfos, since container/list is not goroutine-safe
channelInfos *list.List
downNodeChan chan int64
taskMutex sync.RWMutex // mutex for channelInfos, since container/list is not goroutine-safe
// nodeID, UnsubscribeChannelInfo
tasks map[int64]*querypb.UnsubscribeChannelInfo
notify chan struct{}
closed bool

wg sync.WaitGroup
}

// newChannelUnsubscribeHandler create a new handler service to unsubscribe channels
func newChannelUnsubscribeHandler(ctx context.Context, kv *etcdkv.EtcdKV, factory dependency.Factory) (*channelUnsubscribeHandler, error) {
func NewChannelCleaner(ctx context.Context, kv *etcdkv.EtcdKV, factory dependency.Factory) (*ChannelCleaner, error) {
childCtx, cancel := context.WithCancel(ctx)
handler := &channelUnsubscribeHandler{
handler := &ChannelCleaner{
ctx: childCtx,
cancel: cancel,
kvClient: kv,
factory: factory,

channelInfos: list.New(),
//TODO:: if the query nodes that are down exceed 1024, query coord will not be able to restart
downNodeChan: make(chan int64, 1024),
tasks: make(map[int64]*querypb.UnsubscribeChannelInfo, 1024),
notify: make(chan struct{}, 1024),
}

err := handler.reloadFromKV()
Expand All @@ -72,103 +69,129 @@ func newChannelUnsubscribeHandler(ctx context.Context, kv *etcdkv.EtcdKV, factor
return handler, nil
}

// appendUnsubInfo pushes unsub info safely
func (csh *channelUnsubscribeHandler) appendUnsubInfo(info *querypb.UnsubscribeChannelInfo) {
csh.mut.Lock()
defer csh.mut.Unlock()
csh.channelInfos.PushBack(info)
}

// reloadFromKV reload unsolved channels to unsubscribe
func (csh *channelUnsubscribeHandler) reloadFromKV() error {
func (cleaner *ChannelCleaner) reloadFromKV() error {
log.Info("start reload unsubscribe channelInfo from kv")
_, channelInfoValues, err := csh.kvClient.LoadWithPrefix(unsubscribeChannelInfoPrefix)
cleaner.taskMutex.Lock()
defer cleaner.taskMutex.Unlock()
_, channelInfoValues, err := cleaner.kvClient.LoadWithPrefix(unsubscribeChannelInfoPrefix)
if err != nil {
return err
}
for _, value := range channelInfoValues {
channelInfo := &querypb.UnsubscribeChannelInfo{}
err = proto.Unmarshal([]byte(value), channelInfo)
info := &querypb.UnsubscribeChannelInfo{}
err = proto.Unmarshal([]byte(value), info)
if err != nil {
return err
}
csh.appendUnsubInfo(channelInfo)
csh.downNodeChan <- channelInfo.NodeID
cleaner.tasks[info.NodeID] = info
}

cleaner.notify <- struct{}{}
log.Info("successufully reload unsubscribe channelInfo from kv", zap.Int("unhandled", len(channelInfoValues)))
return nil
}

// addUnsubscribeChannelInfo add channel info to handler service, and persistent to etcd
func (csh *channelUnsubscribeHandler) addUnsubscribeChannelInfo(info *querypb.UnsubscribeChannelInfo) {
func (cleaner *ChannelCleaner) addUnsubscribeChannelInfo(info *querypb.UnsubscribeChannelInfo) {
if len(info.CollectionChannels) == 0 {
return
}
nodeID := info.NodeID
cleaner.taskMutex.Lock()
defer cleaner.taskMutex.Unlock()
if cleaner.closed {
return
}

_, ok := cleaner.tasks[nodeID]
if ok {
log.Info("duplicate add unsubscribe channel, ignore..", zap.Int64("nodeID", nodeID))
return
}

channelInfoValue, err := proto.Marshal(info)
if err != nil {
panic(err)
}
// when queryCoord is restarted multiple times, the nodeID of added channelInfo may be the same
hasEnqueue := false
// reduce the lock range to iteration here, since `addUnsubscribeChannelInfo` is called one by one
csh.mut.RLock()
for e := csh.channelInfos.Back(); e != nil; e = e.Prev() {
if e.Value.(*querypb.UnsubscribeChannelInfo).NodeID == nodeID {
hasEnqueue = true
}
}
csh.mut.RUnlock()

if !hasEnqueue {
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
err = csh.kvClient.Save(channelInfoKey, string(channelInfoValue))
if err != nil {
panic(err)
}
csh.appendUnsubInfo(info)
csh.downNodeChan <- info.NodeID
log.Info("add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID))
//TODO, we don't even need unsubscribeChannelInfoPrefix, each time we just call addUnsubscribeChannelInfo when querycoord restard
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
err = cleaner.kvClient.Save(channelInfoKey, string(channelInfoValue))
if err != nil {
panic(err)
}
cleaner.tasks[info.NodeID] = info
cleaner.notify <- struct{}{}
log.Info("successfully add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID), zap.Any("channels", info.CollectionChannels))
}

// handleChannelUnsubscribeLoop handle the unsubscription of channels which query node has watched
func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
defer csh.wg.Done()
func (cleaner *ChannelCleaner) handleChannelCleanLoop() {
defer cleaner.wg.Done()

ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
for {
select {
case <-csh.ctx.Done():
log.Info("channelUnsubscribeHandler ctx done, handleChannelUnsubscribeLoop end")
case <-cleaner.ctx.Done():
log.Info("channelUnsubscribeHandler ctx done, handleChannelCleanLoop end")
return
case <-csh.downNodeChan:
csh.mut.RLock()
e := csh.channelInfos.Front()
channelInfo := csh.channelInfos.Front().Value.(*querypb.UnsubscribeChannelInfo)
csh.mut.RUnlock()
nodeID := channelInfo.NodeID
for _, collectionChannels := range channelInfo.CollectionChannels {
collectionID := collectionChannels.CollectionID
subName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, nodeID)
msgstream.UnsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
case _, ok := <-cleaner.notify:
if ok {
cleaner.taskMutex.Lock()
for segmentID := range cleaner.tasks {
cleaner.process(segmentID)
}
cleaner.taskMutex.Unlock()
}
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
err := csh.kvClient.Remove(channelInfoKey)
if err != nil {
log.Error("remove unsubscribe channelInfo from etcd failed", zap.Int64("nodeID", nodeID))
panic(err)
case <-ticker.C:
cleaner.taskMutex.Lock()
for segmentID := range cleaner.tasks {
cleaner.process(segmentID)
}

csh.mut.Lock()
csh.channelInfos.Remove(e)
csh.mut.Unlock()
log.Info("unsubscribe channels success", zap.Int64("nodeID", nodeID))
cleaner.taskMutex.Unlock()
}
}
}

func (csh *channelUnsubscribeHandler) start() {
csh.wg.Add(1)
go csh.handleChannelUnsubscribeLoop()
func (cleaner *ChannelCleaner) process(nodeID int64) error {
log.Info("start to handle channel clean", zap.Int64("nodeID", nodeID))
channelInfo := cleaner.tasks[nodeID]
for _, collectionChannels := range channelInfo.CollectionChannels {
collectionID := collectionChannels.CollectionID
subName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, nodeID)
// should be ok if we call unsubscribe multiple times
msgstream.UnsubscribeChannels(cleaner.ctx, cleaner.factory, subName, collectionChannels.Channels)
}
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
err := cleaner.kvClient.Remove(channelInfoKey)
if err != nil {
log.Warn("remove unsubscribe channelInfo from etcd failed", zap.Int64("nodeID", nodeID))
return err
}
delete(cleaner.tasks, nodeID)
log.Info("unsubscribe channels success", zap.Int64("nodeID", nodeID))
return nil
}

// check if there exists any unsubscribe task for specified channel
func (cleaner *ChannelCleaner) isNodeChannelCleanHandled(nodeID UniqueID) bool {
cleaner.taskMutex.RLock()
defer cleaner.taskMutex.RUnlock()
_, ok := cleaner.tasks[nodeID]
return !ok
}

func (cleaner *ChannelCleaner) start() {
cleaner.wg.Add(1)
go cleaner.handleChannelCleanLoop()
}

func (csh *channelUnsubscribeHandler) close() {
csh.cancel()
csh.wg.Wait()
func (cleaner *ChannelCleaner) close() {
cleaner.taskMutex.Lock()
cleaner.closed = true
close(cleaner.notify)
cleaner.taskMutex.Unlock()
cleaner.cancel()
cleaner.wg.Wait()
}
21 changes: 10 additions & 11 deletions internal/querycoord/channel_unsubscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ func Test_HandlerReloadFromKV(t *testing.T) {
assert.Nil(t, err)

factory := dependency.NewDefaultFactory(true)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
cleaner, err := NewChannelCleaner(baseCtx, kv, factory)
assert.Nil(t, err)
assert.Equal(t, 1, len(handler.downNodeChan))

assert.False(t, cleaner.isNodeChannelCleanHandled(defaultQueryNodeID))

cancel()
}
Expand All @@ -64,7 +65,7 @@ func Test_AddUnsubscribeChannelInfo(t *testing.T) {
defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
factory := dependency.NewDefaultFactory(true)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
cleaner, err := NewChannelCleaner(baseCtx, kv, factory)
assert.Nil(t, err)

collectionChannels := &querypb.UnsubscribeChannels{
Expand All @@ -76,14 +77,12 @@ func Test_AddUnsubscribeChannelInfo(t *testing.T) {
CollectionChannels: []*querypb.UnsubscribeChannels{collectionChannels},
}

handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
frontValue := handler.channelInfos.Front()
assert.NotNil(t, frontValue)
assert.Equal(t, defaultQueryNodeID, frontValue.Value.(*querypb.UnsubscribeChannelInfo).NodeID)
cleaner.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
assert.Equal(t, len(cleaner.tasks), 1)

// repeat nodeID which has down
handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
assert.Equal(t, 1, len(handler.downNodeChan))
cleaner.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
assert.Equal(t, len(cleaner.tasks), 1)

cancel()
}
Expand All @@ -96,7 +95,7 @@ func Test_HandleChannelUnsubscribeLoop(t *testing.T) {
defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
factory := dependency.NewDefaultFactory(true)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
handler, err := NewChannelCleaner(baseCtx, kv, factory)
assert.Nil(t, err)

collectionChannels := &querypb.UnsubscribeChannels{
Expand All @@ -116,7 +115,7 @@ func Test_HandleChannelUnsubscribeLoop(t *testing.T) {
handler.start()

for {
_, err = kv.Load(channelInfoKey)
_, err := kv.Load(channelInfoKey)
if err != nil {
break
}
Expand Down
13 changes: 5 additions & 8 deletions internal/querycoord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)

const (
queryNodeInfoPrefix = "queryCoord-queryNodeInfo"
)

// Cluster manages all query node connections and grpc requests
type Cluster interface {
// Collection/Parition
Expand Down Expand Up @@ -107,14 +103,14 @@ type queryNodeCluster struct {

sync.RWMutex
clusterMeta Meta
handler *channelUnsubscribeHandler
cleaner *ChannelCleaner
nodes map[int64]Node
newNodeFn newQueryNodeFn
segmentAllocator SegmentAllocatePolicy
channelAllocator ChannelAllocatePolicy
}

func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session, handler *channelUnsubscribeHandler) (Cluster, error) {
func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session, cleaner *ChannelCleaner) (Cluster, error) {
childCtx, cancel := context.WithCancel(ctx)
nodes := make(map[int64]Node)
c := &queryNodeCluster{
Expand All @@ -123,7 +119,7 @@ func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdK
client: kv,
session: session,
clusterMeta: clusterMeta,
handler: handler,
cleaner: cleaner,
nodes: nodes,
newNodeFn: newNodeFn,
segmentAllocator: defaultSegAllocatePolicy(),
Expand Down Expand Up @@ -543,13 +539,14 @@ func (c *queryNodeCluster) setNodeState(nodeID int64, node Node, state nodeState

// 2.add unsubscribed channels to handler, handler will auto unsubscribe channel
if len(unsubscribeChannelInfo.CollectionChannels) != 0 {
c.handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
c.cleaner.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
}
}

node.setState(state)
}

// TODO, registerNode return error is not handled correctly
func (c *queryNodeCluster) RegisterNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error {
c.Lock()
defer c.Unlock()
Expand Down
Loading

0 comments on commit c975635

Please sign in to comment.