Skip to content

Commit

Permalink
Handle flowgraph releasing properly (milvus-io#16169)
Browse files Browse the repository at this point in the history
See also: milvus-io#15846

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Apr 20, 2022
1 parent 69252f8 commit 93777e0
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 55 deletions.
50 changes: 23 additions & 27 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
}

if isEndWatchState(watchInfo.State) {
log.Warn("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String()))
log.Debug("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String()))
return
}

Expand All @@ -328,14 +328,13 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
actualManager, loaded := node.eventManagerMap.LoadOrStore(e.vChanName, newChannelEventManager(
node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval,
))

if !loaded {
actualManager.(*channelEventManager).Run()
}

actualManager.(*channelEventManager).handleEvent(*e)

// Whenever a delete event comes, this eventManger will be removed from map
// Whenever a delete event comes, this eventManager will be removed from map
if e.eventType == deleteEventType {
if m, loaded := node.eventManagerMap.LoadAndDelete(e.vChanName); loaded {
m.(*channelEventManager).Close()
Expand Down Expand Up @@ -371,29 +370,15 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version
if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan()); err != nil {
return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err)
}
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
defer func() {
if err != nil {
node.releaseFlowgraph(vChanName)
}
}()

log.Debug("handle put event: new data sync service success", zap.String("vChanName", vChanName))
watchInfo.State = datapb.ChannelWatchState_WatchSuccess

case datapb.ChannelWatchState_ToRelease:
success := true
func() {
defer func() {
if x := recover(); x != nil {
log.Error("release flowgraph panic", zap.Any("recovered", x))
success = false
}
}()
node.releaseFlowgraph(vChanName)
}()
if !success {
watchInfo.State = datapb.ChannelWatchState_ReleaseFailure
} else {
if node.tryToReleaseFlowgraph(vChanName) {
watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess
} else {
watchInfo.State = datapb.ChannelWatchState_ReleaseFailure
}
}

Expand All @@ -403,31 +388,42 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version
}

k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), vChanName)
log.Info("handle put event: try to save result state", zap.String("key", k), zap.String("state", watchInfo.State.String()))

log.Debug("handle put event: try to save result state", zap.String("key", k), zap.String("state", watchInfo.State.String()))
err = node.watchKv.CompareVersionAndSwap(k, version, string(v))
if err != nil {
return fmt.Errorf("fail to update watch state to etcd, vChanName: %s, state: %s, err: %w", vChanName, watchInfo.State.String(), err)
}
return nil
}

func (node *DataNode) handleDeleteEvent(vChanName string) {
node.releaseFlowgraph(vChanName)
func (node *DataNode) handleDeleteEvent(vChanName string) bool {
return node.tryToReleaseFlowgraph(vChanName)
}

func (node *DataNode) releaseFlowgraph(vChanName string) {
// tryToReleaseFlowgraph tries to release a flowgraph, returns false if failed
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) bool {
success := true
defer func() {
if x := recover(); x != nil {
log.Error("release flowgraph panic", zap.String("vChanName", vChanName), zap.Any("recovered", x))
success = false
}
}()
node.flowgraphManager.release(vChanName)
log.Info("try to release flowgraph success", zap.String("vChanName", vChanName))
return success
}

// BackGroundGC runs in background to release datanode resources
// GOOSE TODO: remove background GC, using ToRelease for drop-collection after #15846
func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
log.Info("DataNode Background GC Start")
for {
select {
case vchanName := <-vChannelCh:
log.Info("GC flowgraph", zap.String("vChanName", vchanName))
node.releaseFlowgraph(vchanName)
node.tryToReleaseFlowgraph(vchanName)
case <-node.ctx.Done():
log.Warn("DataNode context done, exiting background GC")
return
Expand Down
6 changes: 4 additions & 2 deletions internal/datanode/data_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,9 +599,10 @@ func TestWatchChannel(t *testing.T) {
chPut <- struct{}{}
return r
},
func(vChan string) {
func(vChan string) bool {
node.handleDeleteEvent(vChan)
chDel <- struct{}{}
return true
}, time.Millisecond*100,
)
node.eventManagerMap.Store(ch, m)
Expand Down Expand Up @@ -636,9 +637,10 @@ func TestWatchChannel(t *testing.T) {
chPut <- struct{}{}
return r
},
func(vChan string) {
func(vChan string) bool {
node.handleDeleteEvent(vChan)
chDel <- struct{}{}
return true
}, time.Millisecond*100,
)
node.eventManagerMap.Store(ch, m)
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type channelEventManager struct {
eventChan chan event
closeChan chan struct{}
handlePutEvent func(watchInfo *datapb.ChannelWatchInfo, version int64) error // node.handlePutEvent
handleDeleteEvent func(vChanName string) // node.handleDeleteEvent
handleDeleteEvent func(vChanName string) bool // node.handleDeleteEvent
retryInterval time.Duration
}

Expand All @@ -50,7 +50,7 @@ const (
)

func newChannelEventManager(handlePut func(*datapb.ChannelWatchInfo, int64) error,
handleDel func(string), retryInterval time.Duration) *channelEventManager {
handleDel func(string) bool, retryInterval time.Duration) *channelEventManager {
return &channelEventManager{
eventChan: make(chan event, 10),
closeChan: make(chan struct{}),
Expand Down
62 changes: 38 additions & 24 deletions internal/datanode/event_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestChannelEventManager(t *testing.T) {
ran = true
ch <- struct{}{}
return nil
}, func(name string) {}, time.Millisecond*10)
}, func(name string) bool { return true }, time.Millisecond*10)

em.Run()
em.handleEvent(event{
Expand All @@ -56,7 +56,7 @@ func TestChannelEventManager(t *testing.T) {
ran = true
ch <- struct{}{}
return nil
}, func(name string) {}, time.Millisecond*10)
}, func(name string) bool { return true }, time.Millisecond*10)

em.Run()
em.handleEvent(event{
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestChannelEventManager(t *testing.T) {
}

return errors.New("mocked error")
}, func(name string) {}, time.Millisecond*10)
}, func(name string) bool { return true }, time.Millisecond*10)

em.Run()
em.handleEvent(event{
Expand All @@ -107,7 +107,7 @@ func TestChannelEventManager(t *testing.T) {
t.Run("retry until timeout", func(t *testing.T) {
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
}, func(name string) {}, time.Millisecond*100)
}, func(name string) bool { return true }, time.Millisecond*100)

ch := make(chan struct{}, 1)

Expand Down Expand Up @@ -136,7 +136,7 @@ func TestChannelEventManager(t *testing.T) {
ch := make(chan struct{}, 1)
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
}, func(name string) {}, time.Millisecond*10)
}, func(name string) bool { return true }, time.Millisecond*10)

go func() {
ddl := time.Now().Add(time.Minute)
Expand Down Expand Up @@ -168,12 +168,17 @@ func TestChannelEventManager(t *testing.T) {
t.Run("cancel by delete event", func(t *testing.T) {
ch := make(chan struct{}, 1)
ran := false
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
}, func(name string) {
ran = true
ch <- struct{}{}
}, time.Millisecond*10)
em := newChannelEventManager(
func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
},
func(name string) bool {
ran = true
ch <- struct{}{}
return true
},
time.Millisecond*10,
)
em.Run()
em.handleEvent(event{
eventType: putEventType,
Expand All @@ -198,16 +203,19 @@ func TestChannelEventManager(t *testing.T) {
t.Run("overwrite put event", func(t *testing.T) {
ch := make(chan struct{}, 1)
ran := false
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
if version > 0 {
ran = true
ch <- struct{}{}
return nil
}
return errors.New("mocked error")
}, func(name string) {
t.FailNow()
}, time.Millisecond*10)
em := newChannelEventManager(
func(info *datapb.ChannelWatchInfo, version int64) error {
if version > 0 {
ran = true
ch <- struct{}{}
return nil
}
return errors.New("mocked error")
},
func(name string) bool {
return false
},
time.Millisecond*10)
em.Run()
em.handleEvent(event{
eventType: putEventType,
Expand Down Expand Up @@ -241,9 +249,15 @@ func TestChannelEventManager(t *testing.T) {
}

for _, es := range endStates {
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
}, func(name string) { t.FailNow() }, time.Millisecond*100)
em := newChannelEventManager(
func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
},
func(name string) bool {
return false
},
time.Millisecond*100,
)

ch := make(chan struct{}, 1)
ddl := time.Now().Add(time.Minute)
Expand Down

0 comments on commit 93777e0

Please sign in to comment.