diff --git a/internal/datacoord/channel_checker.go b/internal/datacoord/channel_checker.go index 9c205b3da2ed9..ebb3e3b3801bb 100644 --- a/internal/datacoord/channel_checker.go +++ b/internal/datacoord/channel_checker.go @@ -147,6 +147,7 @@ func parseWatchInfo(key string, data []byte) (*datapb.ChannelWatchInfo, error) { if watchInfo.Vchan == nil { return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo, key: %s", key) } + reviseVChannelInfo(watchInfo.GetVchan()) return &watchInfo, nil } diff --git a/internal/datacoord/channel_checker_test.go b/internal/datacoord/channel_checker_test.go index 449ed7abeabb4..e98e2b9176a34 100644 --- a/internal/datacoord/channel_checker_test.go +++ b/internal/datacoord/channel_checker_test.go @@ -195,6 +195,33 @@ func TestChannelStateTimer_parses(t *testing.T) { } }) + t.Run("test parseWatchInfo compatibility", func(t *testing.T) { + oldWatchInfo := datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: "delta-channel1", + UnflushedSegments: []*datapb.SegmentInfo{{ID: 1}}, + FlushedSegments: []*datapb.SegmentInfo{{ID: 2}}, + DroppedSegments: []*datapb.SegmentInfo{{ID: 3}}, + UnflushedSegmentIds: []int64{1}, + }, + StartTs: time.Now().Unix(), + State: datapb.ChannelWatchState_ToWatch, + TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(), + } + + oldData, err := proto.Marshal(&oldWatchInfo) + assert.NoError(t, err) + newWatchInfo, err := parseWatchInfo("key", oldData) + assert.NoError(t, err) + assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetUnflushedSegments()) + assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetFlushedSegments()) + assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetDroppedSegments()) + assert.NotEmpty(t, newWatchInfo.GetVchan().GetUnflushedSegmentIds()) + assert.NotEmpty(t, newWatchInfo.GetVchan().GetFlushedSegmentIds()) + assert.NotEmpty(t, newWatchInfo.GetVchan().GetDroppedSegmentIds()) + }) + t.Run("test getAckType", func(t *testing.T) { tests := []struct { inState datapb.ChannelWatchState diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index b02622113eba4..cffcca33ac58b 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -147,6 +147,7 @@ func (c *ChannelStore) Reload() error { if err := proto.Unmarshal([]byte(v), cw); err != nil { return err } + reviseVChannelInfo(cw.GetVchan()) c.Add(nodeID) channel := &channel{ diff --git a/internal/datacoord/meta_util.go b/internal/datacoord/meta_util.go new file mode 100644 index 0000000000000..85a342cfe2491 --- /dev/null +++ b/internal/datacoord/meta_util.go @@ -0,0 +1,63 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import "github.com/milvus-io/milvus/internal/proto/datapb" + +// reviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2 +func reviseVChannelInfo(vChannel *datapb.VchannelInfo) { + removeDuplicateSegmentIDFn := func(ids []int64) []int64 { + result := make([]int64, 0, len(ids)) + existDict := make(map[int64]bool) + for _, id := range ids { + if _, ok := existDict[id]; !ok { + existDict[id] = true + result = append(result, id) + } + } + return result + } + + if vChannel == nil { + return + } + // if the segment infos is not nil(generated by 2.0.2), append the corresponding IDs to segmentIDs + // and remove the segment infos, remove deplicate ids in case there are some mixed situations + if vChannel.FlushedSegments != nil && len(vChannel.FlushedSegments) > 0 { + for _, segment := range vChannel.FlushedSegments { + vChannel.FlushedSegmentIds = append(vChannel.GetFlushedSegmentIds(), segment.GetID()) + } + vChannel.FlushedSegments = []*datapb.SegmentInfo{} + } + vChannel.FlushedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetFlushedSegmentIds()) + + if vChannel.UnflushedSegments != nil && len(vChannel.UnflushedSegments) > 0 { + for _, segment := range vChannel.UnflushedSegments { + vChannel.UnflushedSegmentIds = append(vChannel.GetUnflushedSegmentIds(), segment.GetID()) + } + vChannel.UnflushedSegments = []*datapb.SegmentInfo{} + } + vChannel.UnflushedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetUnflushedSegmentIds()) + + if vChannel.DroppedSegments != nil && len(vChannel.DroppedSegments) > 0 { + for _, segment := range vChannel.DroppedSegments { + vChannel.DroppedSegmentIds = append(vChannel.GetDroppedSegmentIds(), segment.GetID()) + } + vChannel.DroppedSegments = []*datapb.SegmentInfo{} + } + vChannel.DroppedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetDroppedSegmentIds()) +} diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index b22fee67e65de..ae44c5215963a 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -450,6 +450,7 @@ func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker { if err := proto.Unmarshal([]byte(v), watchInfo); err != nil { return nil, err } + reviseVChannelInfo(watchInfo.GetVchan()) // if a channel is not watched after maxWatchDuration, // then we reallocate it to another node if watchInfo.State == datapb.ChannelWatchState_Complete || watchInfo.State == datapb.ChannelWatchState_WatchSuccess { diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index cb5368c8d4381..55e708f964286 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -357,6 +357,7 @@ func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) { if watchInfo.Vchan == nil { return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo") } + reviseVChannelInfo(watchInfo.GetVchan()) return &watchInfo, nil } diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index eb3c093bc1015..1eaa9a89a504d 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -819,6 +819,33 @@ func TestWatchChannel(t *testing.T) { exist := node.flowgraphManager.exist("test3") assert.False(t, exist) }) + + t.Run("handle watchinfo compatibility", func(t *testing.T) { + info := datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: "delta-channel1", + UnflushedSegments: []*datapb.SegmentInfo{{ID: 1}}, + FlushedSegments: []*datapb.SegmentInfo{{ID: 2}}, + DroppedSegments: []*datapb.SegmentInfo{{ID: 3}}, + UnflushedSegmentIds: []int64{1}, + }, + State: datapb.ChannelWatchState_Uncomplete, + TimeoutTs: time.Now().Add(time.Minute).UnixNano(), + } + bs, err := proto.Marshal(&info) + assert.NoError(t, err) + + newWatchInfo, err := parsePutEventData(bs) + assert.NoError(t, err) + + assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetUnflushedSegments()) + assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetFlushedSegments()) + assert.Equal(t, []*datapb.SegmentInfo{}, newWatchInfo.GetVchan().GetDroppedSegments()) + assert.NotEmpty(t, newWatchInfo.GetVchan().GetUnflushedSegmentIds()) + assert.NotEmpty(t, newWatchInfo.GetVchan().GetFlushedSegmentIds()) + assert.NotEmpty(t, newWatchInfo.GetVchan().GetDroppedSegmentIds()) + }) } func TestDataNode_GetComponentStates(t *testing.T) { diff --git a/internal/datanode/meta_util.go b/internal/datanode/meta_util.go new file mode 100644 index 0000000000000..67358ed01abda --- /dev/null +++ b/internal/datanode/meta_util.go @@ -0,0 +1,63 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import "github.com/milvus-io/milvus/internal/proto/datapb" + +// reviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2 +func reviseVChannelInfo(vChannel *datapb.VchannelInfo) { + removeDuplicateSegmentIDFn := func(ids []int64) []int64 { + result := make([]int64, 0, len(ids)) + existDict := make(map[int64]bool) + for _, id := range ids { + if _, ok := existDict[id]; !ok { + existDict[id] = true + result = append(result, id) + } + } + return result + } + + if vChannel == nil { + return + } + // if the segment infos is not nil(generated by 2.0.2), append the corresponding IDs to segmentIDs + // and remove the segment infos, remove deplicate ids in case there are some mixed situations + if vChannel.FlushedSegments != nil && len(vChannel.FlushedSegments) > 0 { + for _, segment := range vChannel.FlushedSegments { + vChannel.FlushedSegmentIds = append(vChannel.GetFlushedSegmentIds(), segment.GetID()) + } + vChannel.FlushedSegments = []*datapb.SegmentInfo{} + } + vChannel.FlushedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetFlushedSegmentIds()) + + if vChannel.UnflushedSegments != nil && len(vChannel.UnflushedSegments) > 0 { + for _, segment := range vChannel.UnflushedSegments { + vChannel.UnflushedSegmentIds = append(vChannel.GetUnflushedSegmentIds(), segment.GetID()) + } + vChannel.UnflushedSegments = []*datapb.SegmentInfo{} + } + vChannel.UnflushedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetUnflushedSegmentIds()) + + if vChannel.DroppedSegments != nil && len(vChannel.DroppedSegments) > 0 { + for _, segment := range vChannel.DroppedSegments { + vChannel.DroppedSegmentIds = append(vChannel.GetDroppedSegmentIds(), segment.GetID()) + } + vChannel.DroppedSegments = []*datapb.SegmentInfo{} + } + vChannel.DroppedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetDroppedSegmentIds()) +} diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 40d7cc61fc6c5..faefafbd3505c 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -203,6 +203,7 @@ func (m *MetaReplica) reloadFromKV() error { if err != nil { return err } + reviseVChannelInfo(deltaChannelInfo) m.deltaChannelInfos[collectionID] = append(m.deltaChannelInfos[collectionID], deltaChannelInfo) } diff --git a/internal/querycoord/meta_test.go b/internal/querycoord/meta_test.go index 3c9baadafd827..6bad3c95e28fd 100644 --- a/internal/querycoord/meta_test.go +++ b/internal/querycoord/meta_test.go @@ -369,6 +369,97 @@ func TestReloadMetaFromKV(t *testing.T) { assert.Equal(t, collectionInfo.CollectionID, replicas[0].CollectionID) } +func TestVChannelInfoReadFromKVCompatible(t *testing.T) { + refreshParams() + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + assert.Nil(t, err) + defer etcdCli.Close() + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + id := UniqueID(rand.Int31()) + idAllocator := func() (UniqueID, error) { + newID := atomic.AddInt64(&id, 1) + return newID, nil + } + meta := &MetaReplica{ + idAllocator: idAllocator, + collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, + dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, + deltaChannelInfos: map[UniqueID][]*datapb.VchannelInfo{}, + segmentsInfo: newSegmentsInfo(kv), + replicas: NewReplicaInfos(), + } + meta.setKvClient(kv) + + kvs := make(map[string]string) + collectionInfo := &querypb.CollectionInfo{ + CollectionID: defaultCollectionID, + } + collectionBlobs, err := proto.Marshal(collectionInfo) + assert.Nil(t, err) + collectionKey := fmt.Sprintf("%s/%d", collectionMetaPrefix, defaultCollectionID) + kvs[collectionKey] = string(collectionBlobs) + + deltaChannel1 := &datapb.VchannelInfo{ + CollectionID: defaultCollectionID, + ChannelName: "delta-channel1", + FlushedSegments: []*datapb.SegmentInfo{{ + ID: 1, + CollectionID: defaultCollectionID, + }}, + UnflushedSegments: []*datapb.SegmentInfo{{ + ID: 2, + CollectionID: defaultCollectionID, + }}, + DroppedSegments: []*datapb.SegmentInfo{{ + ID: 3, + CollectionID: defaultCollectionID, + }}, + } + deltaChannel2 := &datapb.VchannelInfo{ + CollectionID: defaultCollectionID, + ChannelName: "delta-channel2", + FlushedSegments: []*datapb.SegmentInfo{{ + ID: 4, + CollectionID: defaultCollectionID, + }}, + UnflushedSegments: []*datapb.SegmentInfo{{ + ID: 5, + CollectionID: defaultCollectionID, + }}, + DroppedSegments: []*datapb.SegmentInfo{{ + ID: 6, + CollectionID: defaultCollectionID, + }}, + } + + infos := []*datapb.VchannelInfo{deltaChannel1, deltaChannel2} + for _, info := range infos { + infoBytes, err := proto.Marshal(info) + assert.Nil(t, err) + + key := fmt.Sprintf("%s/%d/%s", deltaChannelMetaPrefix, defaultCollectionID, info.ChannelName) + kvs[key] = string(infoBytes) + } + + err = kv.MultiSave(kvs) + assert.Nil(t, err) + + err = meta.reloadFromKV() + assert.Nil(t, err) + + assert.Equal(t, 1, len(meta.collectionInfos)) + collectionInfo, err = meta.getCollectionInfoByID(collectionInfo.CollectionID) + assert.NoError(t, err) + assert.Equal(t, 1, len(collectionInfo.ReplicaIds)) + assert.Equal(t, int32(1), collectionInfo.ReplicaNumber) + + channels, err := meta.getDeltaChannelsByCollectionID(collectionInfo.CollectionID) + assert.NoError(t, err) + assert.Equal(t, 1, len(channels[0].GetFlushedSegmentIds())) + assert.Equal(t, 1, len(channels[0].GetUnflushedSegmentIds())) + assert.Equal(t, 1, len(channels[0].GetDroppedSegmentIds())) +} + func TestSaveSegments(t *testing.T) { refreshParams() etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 450aa18fbe8b1..b685c70141bb3 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -375,6 +375,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, //TODO::trigger condition may be different loadReq := querypb.WatchDeltaChannelsRequest{} err = proto.Unmarshal([]byte(t), &loadReq) + reviseWatchDeltaChannelsRequest(&loadReq) if err != nil { return nil, err } diff --git a/internal/querycoord/task_util.go b/internal/querycoord/task_util.go index bbb712b2c7cb0..0048addcbcc56 100644 --- a/internal/querycoord/task_util.go +++ b/internal/querycoord/task_util.go @@ -24,11 +24,16 @@ import ( "go.uber.org/zap" ) -// generateFullWatchDmChannelsRequest fill the WatchDmChannelsRequest by get segment infos from Meta +// generateFullWatchDmChannelsRequest fill the WatchDmChannelsRequest by get segment infos from meta broker func generateFullWatchDmChannelsRequest(broker *globalMetaBroker, request *querypb.WatchDmChannelsRequest) (*querypb.WatchDmChannelsRequest, error) { cloned := proto.Clone(request).(*querypb.WatchDmChannelsRequest) vChannels := cloned.GetInfos() + // for upgrade compatibility from 2.0.2 + for _, vChannel := range vChannels { + reviseVChannelInfo(vChannel) + } + // fill segmentInfos segmentIds := make([]int64, 0) for _, vChannel := range vChannels { @@ -57,3 +62,54 @@ func thinWatchDmChannelsRequest(request *querypb.WatchDmChannelsRequest) *queryp cloned.SegmentInfos = make(map[int64]*datapb.SegmentInfo) return cloned } + +// reviseWatchDeltaChannelsRequest will revise the WatchDeltaChannelsRequest for upgrade compatibility from 2.0.2 +func reviseWatchDeltaChannelsRequest(req *querypb.WatchDeltaChannelsRequest) { + for _, vChannel := range req.GetInfos() { + reviseVChannelInfo(vChannel) + } +} + +// reviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2 +func reviseVChannelInfo(vChannel *datapb.VchannelInfo) { + removeDuplicateSegmentIDFn := func(ids []int64) []int64 { + result := make([]int64, 0, len(ids)) + existDict := make(map[int64]bool) + for _, id := range ids { + if _, ok := existDict[id]; !ok { + existDict[id] = true + result = append(result, id) + } + } + return result + } + + if vChannel == nil { + return + } + // if the segment infos is not nil(generated by 2.0.2), append the corresponding IDs to segmentIDs + // and remove the segment infos, remove deplicate ids in case there are some mixed situations + if vChannel.FlushedSegments != nil && len(vChannel.FlushedSegments) > 0 { + for _, segment := range vChannel.FlushedSegments { + vChannel.FlushedSegmentIds = append(vChannel.GetFlushedSegmentIds(), segment.GetID()) + } + vChannel.FlushedSegments = []*datapb.SegmentInfo{} + } + vChannel.FlushedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetFlushedSegmentIds()) + + if vChannel.UnflushedSegments != nil && len(vChannel.UnflushedSegments) > 0 { + for _, segment := range vChannel.UnflushedSegments { + vChannel.UnflushedSegmentIds = append(vChannel.GetUnflushedSegmentIds(), segment.GetID()) + } + vChannel.UnflushedSegments = []*datapb.SegmentInfo{} + } + vChannel.UnflushedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetUnflushedSegmentIds()) + + if vChannel.DroppedSegments != nil && len(vChannel.DroppedSegments) > 0 { + for _, segment := range vChannel.DroppedSegments { + vChannel.DroppedSegmentIds = append(vChannel.GetDroppedSegmentIds(), segment.GetID()) + } + vChannel.DroppedSegments = []*datapb.SegmentInfo{} + } + vChannel.DroppedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetDroppedSegmentIds()) +} diff --git a/internal/querycoord/task_util_test.go b/internal/querycoord/task_util_test.go index 9de47fb074209..edd8a986d32a9 100644 --- a/internal/querycoord/task_util_test.go +++ b/internal/querycoord/task_util_test.go @@ -86,3 +86,41 @@ func TestThinWatchDmChannelsRequest(t *testing.T) { thinReq := thinWatchDmChannelsRequest(watchDmChannelsRequest) assert.Empty(t, thinReq.GetSegmentInfos()) } + +func TestUpgradeCompatibility(t *testing.T) { + dataCoord := &dataCoordMock{} + ctx, cancel := context.WithCancel(context.Background()) + handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, nil) + assert.Nil(t, err) + + deltaChannel := &datapb.VchannelInfo{ + CollectionID: defaultCollectionID, + ChannelName: "delta-channel1", + UnflushedSegments: []*datapb.SegmentInfo{{ID: 1}}, + FlushedSegments: []*datapb.SegmentInfo{{ID: 2}}, + DroppedSegments: []*datapb.SegmentInfo{{ID: 3}}, + UnflushedSegmentIds: []int64{1}, + } + + watchDmChannelsRequest := &querypb.WatchDmChannelsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_WatchDmChannels, + }, + Infos: []*datapb.VchannelInfo{deltaChannel}, + NodeID: 1, + } + + fullWatchDmChannelsRequest, err := generateFullWatchDmChannelsRequest(handler, watchDmChannelsRequest) + assert.Nil(t, err) + assert.NotEmpty(t, fullWatchDmChannelsRequest.GetSegmentInfos()) + vChannel := fullWatchDmChannelsRequest.GetInfos()[0] + assert.Equal(t, []*datapb.SegmentInfo{}, vChannel.GetUnflushedSegments()) + assert.Equal(t, []*datapb.SegmentInfo{}, vChannel.GetFlushedSegments()) + assert.Equal(t, []*datapb.SegmentInfo{}, vChannel.GetDroppedSegments()) + assert.NotEmpty(t, vChannel.GetUnflushedSegmentIds()) + assert.NotEmpty(t, vChannel.GetFlushedSegmentIds()) + assert.NotEmpty(t, vChannel.GetDroppedSegmentIds()) + + assert.Equal(t, 1, len(vChannel.GetUnflushedSegmentIds())) + cancel() +}