Skip to content

Commit

Permalink
Cancel timers while adding new timer (milvus-io#17511)
Browse files Browse the repository at this point in the history
See also: milvus-io#17335

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Jun 13, 2022
1 parent d1de8ca commit e6225d9
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 0 deletions.
1 change: 1 addition & 0 deletions internal/datacoord/channel_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
return
}
stop := make(chan struct{})
c.removeTimers([]string{channelName})
c.runningTimers.Store(channelName, stop)
timeoutT := time.Unix(0, timeoutTs)
go func() {
Expand Down
20 changes: 20 additions & 0 deletions internal/datacoord/channel_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,26 @@ func TestChannelStateTimer(t *testing.T) {
timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-remove", 1, normalTimeoutTs)
timer.removeTimers([]string{"channel-remove"})
})

t.Run("test startOne no leaking issue 17335", func(t *testing.T) {
timeoutTs := time.Now().Add(20 * time.Second).UnixNano()
timer := newChannelStateTimer(kv)

timer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, timeoutTs)
stop, ok := timer.runningTimers.Load("channel-1")
require.True(t, ok)

timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-1", 1, timeoutTs)
_, ok = <-stop.(chan struct{})
assert.False(t, ok)

stop2, ok := timer.runningTimers.Load("channel-1")
assert.True(t, ok)

timer.removeTimers([]string{"channel-1"})
_, ok = <-stop2.(chan struct{})
assert.False(t, ok)
})
}

func TestChannelStateTimer_parses(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
return
case ackEvent := <-timeoutWatcher:
log.Debug("receive timeout acks from state watcher",
zap.Any("state", ackEvent.ackType),
zap.Int64("nodeID", ackEvent.nodeID), zap.String("channel name", ackEvent.channelName))
c.processAck(ackEvent)
case event, ok := <-etcdWatcher:
Expand Down

0 comments on commit e6225d9

Please sign in to comment.