Skip to content

Commit

Permalink
Compaction multiSegmentChangeInfo to a single info (#10587)
Browse files Browse the repository at this point in the history
Signed-off-by: xige-16 <xi.ge@zilliz.com>
  • Loading branch information
xige-16 authored Oct 26, 2021
1 parent aff5cd9 commit 0cedd9d
Show file tree
Hide file tree
Showing 10 changed files with 420 additions and 358 deletions.
27 changes: 15 additions & 12 deletions internal/msgstream/msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,20 @@ func TestSealedSegmentsChangeInfoMsg(t *testing.T) {
}
}

changeInfo := &querypb.SegmentChangeInfo{
OnlineNodeID: int64(1),
OnlineSegments: []*querypb.SegmentInfo{
genSimpleSegmentInfo(1),
genSimpleSegmentInfo(2),
genSimpleSegmentInfo(3),
},
OfflineNodeID: int64(2),
OfflineSegments: []*querypb.SegmentInfo{
genSimpleSegmentInfo(4),
genSimpleSegmentInfo(5),
genSimpleSegmentInfo(6),
},
}
changeInfoMsg := &SealedSegmentsChangeInfoMsg{
BaseMsg: generateBaseMsg(),
SealedSegmentsChangeInfo: querypb.SealedSegmentsChangeInfo{
Expand All @@ -847,18 +861,7 @@ func TestSealedSegmentsChangeInfoMsg(t *testing.T) {
Timestamp: 2,
SourceID: 3,
},
OnlineNodeID: int64(1),
OnlineSegments: []*querypb.SegmentInfo{
genSimpleSegmentInfo(1),
genSimpleSegmentInfo(2),
genSimpleSegmentInfo(3),
},
OfflineNodeID: int64(2),
OfflineSegments: []*querypb.SegmentInfo{
genSimpleSegmentInfo(4),
genSimpleSegmentInfo(5),
genSimpleSegmentInfo(6),
},
Infos: []*querypb.SegmentChangeInfo{changeInfo},
},
}

Expand Down
12 changes: 8 additions & 4 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,14 @@ message LoadBalanceRequest {
}

//---------------- common query proto -----------------
message SegmentChangeInfo {
int64 online_nodeID = 1;
repeated SegmentInfo online_segments = 2;
int64 offline_nodeID = 3;
repeated SegmentInfo offline_segments = 4;
}

message SealedSegmentsChangeInfo {
common.MsgBase base = 1;
int64 online_nodeID = 2;
repeated SegmentInfo online_segments = 3;
int64 offline_nodeID = 4;
repeated SegmentInfo offline_segments = 5;
repeated SegmentChangeInfo infos = 2;
}
389 changes: 215 additions & 174 deletions internal/proto/querypb/query_coord.pb.go

Large diffs are not rendered by default.

106 changes: 51 additions & 55 deletions internal/querycoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
)

type col2SegmentInfos = map[UniqueID][]*querypb.SegmentInfo
type col2SealedSegmentChangeInfos = map[UniqueID][]*querypb.SealedSegmentsChangeInfo
type col2SealedSegmentChangeInfos = map[UniqueID]*querypb.SealedSegmentsChangeInfo

// Meta contains information about all loaded collections and partitions, including segment information and vchannel information
type Meta interface {
Expand Down Expand Up @@ -79,7 +79,7 @@ type Meta interface {
//printMeta()
saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2SealedSegmentChangeInfos, error)
removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error)
sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos []*querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error)
sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error)
}

// MetaReplica records the current load information on all querynodes
Expand Down Expand Up @@ -369,15 +369,18 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
// generate segment change info according segment info to updated
col2SegmentChangeInfos := make(col2SealedSegmentChangeInfos)

// get segmentInfos to save
// get segmentInfos to sav
for collectionID, onlineInfos := range saves {
segmentsChangeInfo := &querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SealedSegmentsChangeInfo,
},
Infos: []*querypb.SegmentChangeInfo{},
}
for _, info := range onlineInfos {
segmentID := info.SegmentID
onlineNodeID := info.NodeID
segmentChangeInfo := querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SealedSegmentsChangeInfo,
},
changeInfo := &querypb.SegmentChangeInfo{
OnlineNodeID: onlineNodeID,
OnlineSegments: []*querypb.SegmentInfo{info},
}
Expand All @@ -386,16 +389,13 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
offlineNodeID := offlineInfo.NodeID
// if the offline segment state is growing, it will not impact the global sealed segments
if offlineInfo.SegmentState == querypb.SegmentState_sealed {
segmentChangeInfo.OfflineNodeID = offlineNodeID
segmentChangeInfo.OfflineSegments = []*querypb.SegmentInfo{offlineInfo}
changeInfo.OfflineNodeID = offlineNodeID
changeInfo.OfflineSegments = []*querypb.SegmentInfo{offlineInfo}
}
}

if _, ok := col2SegmentChangeInfos[collectionID]; !ok {
col2SegmentChangeInfos[collectionID] = []*querypb.SealedSegmentsChangeInfo{}
}
col2SegmentChangeInfos[collectionID] = append(col2SegmentChangeInfos[collectionID], &segmentChangeInfo)
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, changeInfo)
}
col2SegmentChangeInfos[collectionID] = segmentsChangeInfo
}

queryChannelInfosMap := make(map[UniqueID]*querypb.QueryChannelInfo)
Expand All @@ -405,12 +405,12 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
if err != nil {
return nil, err
}
// len(messageIDs) == 1
messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannelID]
if !ok || len(messageIDs) == 0 {
return col2SegmentChangeInfos, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed")
}
seekMessageID := messageIDs[len(messageIDs)-1]
queryChannelInfo.SeekPosition.MsgID = seekMessageID.Serialize()
queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize()

// update segmentInfo, queryChannelInfo meta to cache and etcd
seg2Info := make(map[UniqueID]*querypb.SegmentInfo)
Expand Down Expand Up @@ -458,15 +458,13 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
// avoid the produce process success but save meta to etcd failed
// then the msgID key will not exist, and changeIndo will be ignored by query node
for _, changeInfos := range col2SegmentChangeInfos {
for _, info := range changeInfos {
changeInfoBytes, err := proto.Marshal(info)
if err != nil {
return col2SegmentChangeInfos, err
}
// TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg
changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, info.Base.MsgID)
saveKvs[changeInfoKey] = string(changeInfoBytes)
changeInfoBytes, err := proto.Marshal(changeInfos)
if err != nil {
return col2SegmentChangeInfos, err
}
// TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg
changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, changeInfos.Base.MsgID)
saveKvs[changeInfoKey] = string(changeInfoBytes)
}

err := m.client.MultiSave(saveKvs)
Expand All @@ -493,36 +491,38 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
}

func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error) {
segmentChangeInfos := make([]*querypb.SealedSegmentsChangeInfo, 0)
removes := m.showSegmentInfos(collectionID, partitionIDs)
if len(removes) == 0 {
return nil, nil
}
// get segmentInfos to remove
segmentChangeInfos := &querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SealedSegmentsChangeInfo,
},
Infos: []*querypb.SegmentChangeInfo{},
}
for _, info := range removes {
offlineNodeID := info.NodeID
changeInfo := querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SealedSegmentsChangeInfo,
},
changeInfo := &querypb.SegmentChangeInfo{
OfflineNodeID: offlineNodeID,
OfflineSegments: []*querypb.SegmentInfo{info},
}

segmentChangeInfos = append(segmentChangeInfos, &changeInfo)
segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, changeInfo)
}

// get msgStream to produce sealedSegmentChangeInfos to query channel
queryChannelInfo, messageIDInfos, err := m.sendSealedSegmentChangeInfos(collectionID, segmentChangeInfos)
if err != nil {
return nil, err
}
// len(messageIDs) = 1
messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannelID]
if !ok || len(messageIDs) == 0 {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed")
}
seekMessageID := messageIDs[len(messageIDs)-1]
queryChannelInfo.SeekPosition.MsgID = seekMessageID.Serialize()
queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize()

// update segmentInfo, queryChannelInfo meta to cache and etcd
seg2Info := make(map[UniqueID]*querypb.SegmentInfo)
Expand Down Expand Up @@ -554,15 +554,13 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
// save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd
// avoid the produce process success but save meta to etcd failed
// then the msgID key will not exist, and changeIndo will be ignored by query node
for _, info := range segmentChangeInfos {
changeInfoBytes, err := proto.Marshal(info)
if err != nil {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err
}
// TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg
changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, info.Base.MsgID)
saveKvs[changeInfoKey] = string(changeInfoBytes)
changeInfoBytes, err := proto.Marshal(segmentChangeInfos)
if err != nil {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err
}
// TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg
changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, segmentChangeInfos.Base.MsgID)
saveKvs[changeInfoKey] = string(changeInfoBytes)

removeKeys := make([]string, 0)
for _, info := range removes {
Expand All @@ -588,7 +586,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil
}

func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos []*querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) {
func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) {
// get msgStream to produce sealedSegmentChangeInfos to query channel
queryChannelInfo, err := m.getQueryChannelInfoByID(collectionID)
if err != nil {
Expand All @@ -605,21 +603,19 @@ func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, change
var msgPack = &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{},
}
for _, changeInfo := range changeInfos {
id, err := m.idAllocator()
if err != nil {
log.Error("allocator trigger taskID failed", zap.Error(err))
return nil, nil, err
}
changeInfo.Base.MsgID = id
segmentChangeMsg := &msgstream.SealedSegmentsChangeInfoMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SealedSegmentsChangeInfo: *changeInfo,
}
msgPack.Msgs = append(msgPack.Msgs, segmentChangeMsg)
id, err := m.idAllocator()
if err != nil {
log.Error("allocator trigger taskID failed", zap.Error(err))
return nil, nil, err
}
changeInfos.Base.MsgID = id
segmentChangeMsg := &msgstream.SealedSegmentsChangeInfoMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SealedSegmentsChangeInfo: *changeInfos,
}
msgPack.Msgs = append(msgPack.Msgs, segmentChangeMsg)

messageIDInfos, err := queryStream.ProduceMark(msgPack)
if err != nil {
Expand Down
21 changes: 12 additions & 9 deletions internal/querycoord/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,22 +931,25 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
return nil
}

func reverseSealedSegmentChangeInfo(changeInfosMap map[UniqueID][]*querypb.SealedSegmentsChangeInfo) map[UniqueID][]*querypb.SealedSegmentsChangeInfo {
result := make(map[UniqueID][]*querypb.SealedSegmentsChangeInfo)
func reverseSealedSegmentChangeInfo(changeInfosMap map[UniqueID]*querypb.SealedSegmentsChangeInfo) map[UniqueID]*querypb.SealedSegmentsChangeInfo {
result := make(map[UniqueID]*querypb.SealedSegmentsChangeInfo)
for collectionID, changeInfos := range changeInfosMap {
result[collectionID] = []*querypb.SealedSegmentsChangeInfo{}
for _, info := range changeInfos {
segmentChangeInfo := &querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SealedSegmentsChangeInfo,
},
segmentChangeInfos := &querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SealedSegmentsChangeInfo,
},
Infos: []*querypb.SegmentChangeInfo{},
}
for _, info := range changeInfos.Infos {
changeInfo := &querypb.SegmentChangeInfo{
OnlineNodeID: info.OfflineNodeID,
OnlineSegments: info.OfflineSegments,
OfflineNodeID: info.OnlineNodeID,
OfflineSegments: info.OnlineSegments,
}
result[collectionID] = append(result[collectionID], segmentChangeInfo)
segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, changeInfo)
}
result[collectionID] = segmentChangeInfos
}

return result
Expand Down
8 changes: 6 additions & 2 deletions internal/querynode/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,8 +1254,7 @@ func consumeSimpleRetrieveResult(stream msgstream.MsgStream) (*msgstream.Retriev
}

func genSimpleChangeInfo() *querypb.SealedSegmentsChangeInfo {
return &querypb.SealedSegmentsChangeInfo{
Base: genCommonMsgBase(commonpb.MsgType_LoadBalanceSegments),
changeInfo := &querypb.SegmentChangeInfo{
OnlineNodeID: Params.QueryNodeID,
OnlineSegments: []*querypb.SegmentInfo{
genSimpleSegmentInfo(),
Expand All @@ -1265,6 +1264,11 @@ func genSimpleChangeInfo() *querypb.SealedSegmentsChangeInfo {
genSimpleSegmentInfo(),
},
}

return &querypb.SealedSegmentsChangeInfo{
Base: genCommonMsgBase(commonpb.MsgType_LoadBalanceSegments),
Infos: []*querypb.SegmentChangeInfo{changeInfo},
}
}

func saveChangeInfo(key string, value string) error {
Expand Down
56 changes: 29 additions & 27 deletions internal/querynode/query_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,35 +305,37 @@ func (q *queryCollection) consumeQuery() {
}

func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChangeInfoMsg) error {
// for OnlineSegments:
for _, segment := range msg.OnlineSegments {
// 1. update global sealed segments
err := q.globalSegmentManager.addGlobalSegmentInfo(segment)
if err != nil {
return err
}
// 2. update excluded segment, cluster have been loaded sealed segments,
// so we need to avoid getting growing segment from flow graph.
q.streaming.replica.addExcludedSegments(segment.CollectionID, []*datapb.SegmentInfo{
{
ID: segment.SegmentID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
InsertChannel: segment.ChannelID,
NumOfRows: segment.NumRows,
// TODO: add status, remove query pb segment status, use common pb segment status?
DmlPosition: &internalpb.MsgPosition{
// use max timestamp to filter out dm messages
Timestamp: math.MaxInt64,
for _, info := range msg.Infos {
// for OnlineSegments:
for _, segment := range info.OnlineSegments {
// 1. update global sealed segments
err := q.globalSegmentManager.addGlobalSegmentInfo(segment)
if err != nil {
return err
}
// 2. update excluded segment, cluster have been loaded sealed segments,
// so we need to avoid getting growing segment from flow graph.
q.streaming.replica.addExcludedSegments(segment.CollectionID, []*datapb.SegmentInfo{
{
ID: segment.SegmentID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
InsertChannel: segment.ChannelID,
NumOfRows: segment.NumRows,
// TODO: add status, remove query pb segment status, use common pb segment status?
DmlPosition: &internalpb.MsgPosition{
// use max timestamp to filter out dm messages
Timestamp: typeutil.MaxTimestamp,
},
},
},
})
}
})
}

// for OfflineSegments:
for _, segment := range msg.OfflineSegments {
// update global sealed segments
q.globalSegmentManager.removeGlobalSegmentInfo(segment.SegmentID)
// for OfflineSegments:
for _, segment := range info.OfflineSegments {
// 1. update global sealed segments
q.globalSegmentManager.removeGlobalSegmentInfo(segment.SegmentID)
}
}
return nil
}
Expand Down
Loading

0 comments on commit 0cedd9d

Please sign in to comment.