Skip to content

Commit

Permalink
Fix datacoord set wrong state for node registering (milvus-io#17376)
Browse files Browse the repository at this point in the history
Fix datacoord datarace

See also: milvus-io#17335

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Jun 6, 2022
1 parent a4a6991 commit c70af73
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 9 deletions.
11 changes: 10 additions & 1 deletion internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,16 @@ func (c *ChannelManager) AddNode(nodeID int64) error {
zap.Int64("registered node", nodeID),
zap.Array("updates", updates))

return c.updateWithTimer(updates, datapb.ChannelWatchState_ToRelease)
state := datapb.ChannelWatchState_ToRelease

for _, u := range updates {
if u.Type == Delete && u.NodeID == bufferID {
state = datapb.ChannelWatchState_ToWatch
break
}
}

return c.updateWithTimer(updates, state)
}

// DeleteNode deletes the node from the cluster.
Expand Down
63 changes: 61 additions & 2 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,27 @@ func TestChannelManager(t *testing.T) {
}()

prefix := Params.DataCoordCfg.ChannelWatchSubPath
t.Run("test AddNode", func(t *testing.T) {
waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) {
for {
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)
v, err := metakv.Load(key)
if err == nil && len(v) > 0 {
watchInfo, err := parseWatchInfo(key, []byte(v))
require.NoError(t, err)
require.Equal(t, waitState, watchInfo.GetState())

watchInfo.State = storeState
data, err := proto.Marshal(watchInfo)
require.NoError(t, err)

metakv.Save(key, string(data))
break
}
time.Sleep(100 * time.Millisecond)
}
}

t.Run("test AddNode with avalible node", func(t *testing.T) {
// Note: this test is based on the default registerPolicy
defer metakv.RemoveWithPrefix("")
var (
Expand Down Expand Up @@ -322,6 +342,45 @@ func TestChannelManager(t *testing.T) {
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeToAdd, "channel-3", collectionID)
})

t.Run("test AddNode with no available node", func(t *testing.T) {
// Note: this test is based on the default registerPolicy
defer metakv.RemoveWithPrefix("")
var (
collectionID = UniqueID(8)
nodeID = UniqueID(119)
channel1, channel2 = "channel1", "channel2"
)

chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
bufferID: {bufferID, []*channel{
{channel1, collectionID},
{channel2, collectionID},
}},
},
}

err = chManager.AddNode(nodeID)
assert.NoError(t, err)

waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel2)

chInfo := chManager.store.GetNode(nodeID)
assert.Equal(t, 2, len(chInfo.Channels))
chManager.Match(nodeID, channel1)
chManager.Match(nodeID, channel2)

err = chManager.Watch(&channel{"channel-3", collectionID})
assert.NoError(t, err)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID)
chManager.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, "channel-3", nodeID})

})

t.Run("test Watch", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var (
Expand Down Expand Up @@ -786,7 +845,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {

prefix := Params.DataCoordCfg.ChannelWatchSubPath

t.Run("one node with two channels add a new node", func(t *testing.T) {
t.Run("one node with three channels add a new node", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")

var (
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
defer t.wg.Done()

// If AutoCompaction disabled, global loop will not start
if !Params.DataCoordCfg.EnableAutoCompaction {
if !Params.DataCoordCfg.GetEnableAutoCompaction() {
return
}

Expand Down Expand Up @@ -176,7 +176,7 @@ func (t *compactionTrigger) triggerCompaction(timetravel *timetravel) error {
// triggerSingleCompaction triger a compaction bundled with collection-partiiton-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, timetravel *timetravel) error {
// If AutoCompaction diabled, flush request will not trigger compaction
if !Params.DataCoordCfg.EnableAutoCompaction {
if !Params.DataCoordCfg.GetEnableAutoCompaction() {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Params.DataCoordCfg.EnableAutoCompaction = tt.fields.autoCompactionEnabled
Params.DataCoordCfg.SetEnableAutoCompaction(tt.fields.autoCompactionEnabled)
tr := &compactionTrigger{
meta: tt.fields.meta,
allocator: tt.fields.allocator,
Expand Down Expand Up @@ -1083,7 +1083,7 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Params.DataCoordCfg.EnableAutoCompaction = tt.fields.enableAutoCompaction
Params.DataCoordCfg.SetEnableAutoCompaction(tt.fields.enableAutoCompaction)
tr := &compactionTrigger{
meta: tt.fields.meta,
allocator: tt.fields.allocator,
Expand Down
16 changes: 14 additions & 2 deletions internal/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ type dataCoordConfig struct {
UpdatedTime time.Time

EnableCompaction bool
EnableAutoCompaction bool
EnableAutoCompaction atomic.Value
EnableGarbageCollection bool

// Garbage Collection
Expand Down Expand Up @@ -955,7 +955,19 @@ func (p *dataCoordConfig) initGCDropTolerance() {
}

func (p *dataCoordConfig) initEnableAutoCompaction() {
p.EnableAutoCompaction = p.Base.ParseBool("dataCoord.compaction.enableAutoCompaction", false)
p.EnableAutoCompaction.Store(p.Base.ParseBool("dataCoord.compaction.enableAutoCompaction", false))
}

func (p *dataCoordConfig) SetEnableAutoCompaction(enable bool) {
p.EnableAutoCompaction.Store(enable)
}

func (p *dataCoordConfig) GetEnableAutoCompaction() bool {
enable := p.EnableAutoCompaction.Load()
if enable != nil {
return enable.(bool)
}
return false
}

func (p *dataCoordConfig) SetNodeID(id UniqueID) {
Expand Down

0 comments on commit c70af73

Please sign in to comment.