From 0cedd9d47e1726ca005ccc93890d2cc3fa82ff57 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Tue, 26 Oct 2021 15:18:22 +0800 Subject: [PATCH] Compaction multiSegmentChangeInfo to a single info (#10587) Signed-off-by: xige-16 --- internal/msgstream/msg_test.go | 27 +- internal/proto/query_coord.proto | 12 +- internal/proto/querypb/query_coord.pb.go | 389 +++++++++++--------- internal/querycoord/meta.go | 106 +++--- internal/querycoord/task_scheduler.go | 21 +- internal/querynode/mock_test.go | 8 +- internal/querynode/query_collection.go | 56 +-- internal/querynode/query_collection_test.go | 33 +- internal/querynode/query_node.go | 110 +++--- internal/querynode/query_node_test.go | 16 +- 10 files changed, 420 insertions(+), 358 deletions(-) diff --git a/internal/msgstream/msg_test.go b/internal/msgstream/msg_test.go index 47d62296c2510..f5d83e4788ddf 100644 --- a/internal/msgstream/msg_test.go +++ b/internal/msgstream/msg_test.go @@ -838,6 +838,20 @@ func TestSealedSegmentsChangeInfoMsg(t *testing.T) { } } + changeInfo := &querypb.SegmentChangeInfo{ + OnlineNodeID: int64(1), + OnlineSegments: []*querypb.SegmentInfo{ + genSimpleSegmentInfo(1), + genSimpleSegmentInfo(2), + genSimpleSegmentInfo(3), + }, + OfflineNodeID: int64(2), + OfflineSegments: []*querypb.SegmentInfo{ + genSimpleSegmentInfo(4), + genSimpleSegmentInfo(5), + genSimpleSegmentInfo(6), + }, + } changeInfoMsg := &SealedSegmentsChangeInfoMsg{ BaseMsg: generateBaseMsg(), SealedSegmentsChangeInfo: querypb.SealedSegmentsChangeInfo{ @@ -847,18 +861,7 @@ func TestSealedSegmentsChangeInfoMsg(t *testing.T) { Timestamp: 2, SourceID: 3, }, - OnlineNodeID: int64(1), - OnlineSegments: []*querypb.SegmentInfo{ - genSimpleSegmentInfo(1), - genSimpleSegmentInfo(2), - genSimpleSegmentInfo(3), - }, - OfflineNodeID: int64(2), - OfflineSegments: []*querypb.SegmentInfo{ - genSimpleSegmentInfo(4), - genSimpleSegmentInfo(5), - genSimpleSegmentInfo(6), - }, + Infos: []*querypb.SegmentChangeInfo{changeInfo}, }, } diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index de048cf853c4c..7c28110075063 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -317,10 +317,14 @@ message LoadBalanceRequest { } //---------------- common query proto ----------------- +message SegmentChangeInfo { + int64 online_nodeID = 1; + repeated SegmentInfo online_segments = 2; + int64 offline_nodeID = 3; + repeated SegmentInfo offline_segments = 4; +} + message SealedSegmentsChangeInfo { common.MsgBase base = 1; - int64 online_nodeID = 2; - repeated SegmentInfo online_segments = 3; - int64 offline_nodeID = 4; - repeated SegmentInfo offline_segments = 5; + repeated SegmentChangeInfo infos = 2; } diff --git a/internal/proto/querypb/query_coord.pb.go b/internal/proto/querypb/query_coord.pb.go index bdb7cb4e200e3..b36ca2fa4fa40 100644 --- a/internal/proto/querypb/query_coord.pb.go +++ b/internal/proto/querypb/query_coord.pb.go @@ -2090,22 +2090,82 @@ func (m *LoadBalanceRequest) GetBalanceReason() TriggerCondition { } //---------------- common query proto ----------------- +type SegmentChangeInfo struct { + OnlineNodeID int64 `protobuf:"varint,1,opt,name=online_nodeID,json=onlineNodeID,proto3" json:"online_nodeID,omitempty"` + OnlineSegments []*SegmentInfo `protobuf:"bytes,2,rep,name=online_segments,json=onlineSegments,proto3" json:"online_segments,omitempty"` + OfflineNodeID int64 `protobuf:"varint,3,opt,name=offline_nodeID,json=offlineNodeID,proto3" json:"offline_nodeID,omitempty"` + OfflineSegments []*SegmentInfo `protobuf:"bytes,4,rep,name=offline_segments,json=offlineSegments,proto3" json:"offline_segments,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SegmentChangeInfo) Reset() { *m = SegmentChangeInfo{} } +func (m *SegmentChangeInfo) String() string { return proto.CompactTextString(m) } +func (*SegmentChangeInfo) ProtoMessage() {} +func (*SegmentChangeInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_aab7cc9a69ed26e8, []int{28} +} + +func (m *SegmentChangeInfo) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SegmentChangeInfo.Unmarshal(m, b) +} +func (m *SegmentChangeInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SegmentChangeInfo.Marshal(b, m, deterministic) +} +func (m *SegmentChangeInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_SegmentChangeInfo.Merge(m, src) +} +func (m *SegmentChangeInfo) XXX_Size() int { + return xxx_messageInfo_SegmentChangeInfo.Size(m) +} +func (m *SegmentChangeInfo) XXX_DiscardUnknown() { + xxx_messageInfo_SegmentChangeInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_SegmentChangeInfo proto.InternalMessageInfo + +func (m *SegmentChangeInfo) GetOnlineNodeID() int64 { + if m != nil { + return m.OnlineNodeID + } + return 0 +} + +func (m *SegmentChangeInfo) GetOnlineSegments() []*SegmentInfo { + if m != nil { + return m.OnlineSegments + } + return nil +} + +func (m *SegmentChangeInfo) GetOfflineNodeID() int64 { + if m != nil { + return m.OfflineNodeID + } + return 0 +} + +func (m *SegmentChangeInfo) GetOfflineSegments() []*SegmentInfo { + if m != nil { + return m.OfflineSegments + } + return nil +} + type SealedSegmentsChangeInfo struct { - Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` - OnlineNodeID int64 `protobuf:"varint,2,opt,name=online_nodeID,json=onlineNodeID,proto3" json:"online_nodeID,omitempty"` - OnlineSegments []*SegmentInfo `protobuf:"bytes,3,rep,name=online_segments,json=onlineSegments,proto3" json:"online_segments,omitempty"` - OfflineNodeID int64 `protobuf:"varint,4,opt,name=offline_nodeID,json=offlineNodeID,proto3" json:"offline_nodeID,omitempty"` - OfflineSegments []*SegmentInfo `protobuf:"bytes,5,rep,name=offline_segments,json=offlineSegments,proto3" json:"offline_segments,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + Infos []*SegmentChangeInfo `protobuf:"bytes,2,rep,name=infos,proto3" json:"infos,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *SealedSegmentsChangeInfo) Reset() { *m = SealedSegmentsChangeInfo{} } func (m *SealedSegmentsChangeInfo) String() string { return proto.CompactTextString(m) } func (*SealedSegmentsChangeInfo) ProtoMessage() {} func (*SealedSegmentsChangeInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_aab7cc9a69ed26e8, []int{28} + return fileDescriptor_aab7cc9a69ed26e8, []int{29} } func (m *SealedSegmentsChangeInfo) XXX_Unmarshal(b []byte) error { @@ -2133,30 +2193,9 @@ func (m *SealedSegmentsChangeInfo) GetBase() *commonpb.MsgBase { return nil } -func (m *SealedSegmentsChangeInfo) GetOnlineNodeID() int64 { - if m != nil { - return m.OnlineNodeID - } - return 0 -} - -func (m *SealedSegmentsChangeInfo) GetOnlineSegments() []*SegmentInfo { - if m != nil { - return m.OnlineSegments - } - return nil -} - -func (m *SealedSegmentsChangeInfo) GetOfflineNodeID() int64 { +func (m *SealedSegmentsChangeInfo) GetInfos() []*SegmentChangeInfo { if m != nil { - return m.OfflineNodeID - } - return 0 -} - -func (m *SealedSegmentsChangeInfo) GetOfflineSegments() []*SegmentInfo { - if m != nil { - return m.OfflineSegments + return m.Infos } return nil } @@ -2194,154 +2233,156 @@ func init() { proto.RegisterType((*LoadBalanceSegmentInfo)(nil), "milvus.proto.query.LoadBalanceSegmentInfo") proto.RegisterType((*HandoffSegmentsRequest)(nil), "milvus.proto.query.HandoffSegmentsRequest") proto.RegisterType((*LoadBalanceRequest)(nil), "milvus.proto.query.LoadBalanceRequest") + proto.RegisterType((*SegmentChangeInfo)(nil), "milvus.proto.query.SegmentChangeInfo") proto.RegisterType((*SealedSegmentsChangeInfo)(nil), "milvus.proto.query.SealedSegmentsChangeInfo") } func init() { proto.RegisterFile("query_coord.proto", fileDescriptor_aab7cc9a69ed26e8) } var fileDescriptor_aab7cc9a69ed26e8 = []byte{ - // 2241 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x59, 0xcb, 0x8f, 0x1c, 0x47, - 0x19, 0xdf, 0x9e, 0xf7, 0x7c, 0xf3, 0x6a, 0x97, 0xbd, 0x93, 0xf1, 0x10, 0x3b, 0x4b, 0x3b, 0x7e, - 0x64, 0x43, 0xd6, 0xce, 0x3a, 0x20, 0x22, 0x91, 0x43, 0xbc, 0x13, 0x6f, 0x26, 0xd8, 0xeb, 0xa5, - 0xd7, 0x09, 0xc2, 0xb2, 0xd4, 0xf4, 0x4c, 0xd7, 0xce, 0xb6, 0xdc, 0xdd, 0x35, 0xee, 0xea, 0xb1, - 0xbd, 0x3e, 0x73, 0xe0, 0x82, 0xf8, 0x03, 0x40, 0x48, 0x48, 0x20, 0xc4, 0x01, 0x6e, 0x80, 0xc4, - 0x29, 0x17, 0xee, 0xfc, 0x05, 0x48, 0x08, 0xee, 0xdc, 0x38, 0xa3, 0xae, 0xaa, 0xee, 0xe9, 0x47, - 0xcd, 0xee, 0x78, 0x17, 0x63, 0x2b, 0xe2, 0xd6, 0xfd, 0xd5, 0x57, 0xdf, 0xef, 0xab, 0xef, 0x59, - 0x0f, 0x38, 0xf3, 0x78, 0x86, 0xfd, 0x43, 0x63, 0x4c, 0x88, 0x6f, 0x6d, 0x4c, 0x7d, 0x12, 0x10, - 0x84, 0x5c, 0xdb, 0x79, 0x32, 0xa3, 0xfc, 0x6f, 0x83, 0x8d, 0xf7, 0x9b, 0x63, 0xe2, 0xba, 0xc4, - 0xe3, 0xb4, 0x7e, 0x33, 0xc9, 0xd1, 0x6f, 0xdb, 0x5e, 0x80, 0x7d, 0xcf, 0x74, 0xa2, 0x51, 0x3a, - 0x3e, 0xc0, 0xae, 0x29, 0xfe, 0x54, 0xcb, 0x0c, 0xcc, 0xa4, 0x7c, 0xed, 0x47, 0x0a, 0x74, 0xf7, - 0x0e, 0xc8, 0xd3, 0x2d, 0xe2, 0x38, 0x78, 0x1c, 0xd8, 0xc4, 0xa3, 0x3a, 0x7e, 0x3c, 0xc3, 0x34, - 0x40, 0x37, 0xa0, 0x34, 0x32, 0x29, 0xee, 0x29, 0x6b, 0xca, 0xb5, 0xc6, 0xe6, 0x9b, 0x1b, 0x29, - 0x4d, 0x84, 0x0a, 0x77, 0xe9, 0xe4, 0x96, 0x49, 0xb1, 0xce, 0x38, 0x11, 0x82, 0x92, 0x35, 0x1a, - 0x0e, 0x7a, 0x85, 0x35, 0xe5, 0x5a, 0x51, 0x67, 0xdf, 0xe8, 0x6d, 0x68, 0x8d, 0x63, 0xd9, 0xc3, - 0x01, 0xed, 0x15, 0xd7, 0x8a, 0xd7, 0x8a, 0x7a, 0x9a, 0xa8, 0xfd, 0x46, 0x81, 0x37, 0x72, 0x6a, - 0xd0, 0x29, 0xf1, 0x28, 0x46, 0x37, 0xa1, 0x42, 0x03, 0x33, 0x98, 0x51, 0xa1, 0xc9, 0xd7, 0xa4, - 0x9a, 0xec, 0x31, 0x16, 0x5d, 0xb0, 0xe6, 0x61, 0x0b, 0x12, 0x58, 0xf4, 0x3e, 0x9c, 0xb3, 0xbd, - 0xbb, 0xd8, 0x25, 0xfe, 0xa1, 0x31, 0xc5, 0xfe, 0x18, 0x7b, 0x81, 0x39, 0xc1, 0x91, 0x8e, 0x67, - 0xa3, 0xb1, 0xdd, 0xf9, 0x90, 0xf6, 0x6b, 0x05, 0x56, 0x43, 0x4d, 0x77, 0x4d, 0x3f, 0xb0, 0x5f, - 0x82, 0xbd, 0x34, 0x68, 0x26, 0x75, 0xec, 0x15, 0xd9, 0x58, 0x8a, 0x16, 0xf2, 0x4c, 0x23, 0xf8, - 0x70, 0x6d, 0x25, 0xa6, 0x6e, 0x8a, 0xa6, 0xfd, 0x4a, 0x38, 0x36, 0xa9, 0xe7, 0x69, 0x0c, 0x9a, - 0xc5, 0x2c, 0xe4, 0x31, 0x4f, 0x62, 0xce, 0x2f, 0x15, 0x58, 0xbd, 0x43, 0x4c, 0x6b, 0xee, 0xf8, - 0xff, 0xbd, 0x39, 0x3f, 0x82, 0x0a, 0xcf, 0x92, 0x5e, 0x89, 0x61, 0x5d, 0x4e, 0x63, 0x89, 0x0c, - 0x9a, 0x6b, 0xb8, 0xc7, 0x08, 0xba, 0x98, 0xa4, 0xfd, 0x5c, 0x81, 0x9e, 0x8e, 0x1d, 0x6c, 0x52, - 0xfc, 0x2a, 0x57, 0xd1, 0x85, 0x8a, 0x47, 0x2c, 0x3c, 0x1c, 0xb0, 0x55, 0x14, 0x75, 0xf1, 0xa7, - 0xfd, 0x53, 0x58, 0xf8, 0x35, 0x0f, 0xd8, 0x84, 0x17, 0xca, 0x27, 0xf1, 0xc2, 0x97, 0x73, 0x2f, - 0xbc, 0xee, 0x2b, 0x9d, 0x7b, 0xaa, 0x9c, 0xf2, 0xd4, 0x0f, 0xe0, 0xfc, 0x96, 0x8f, 0xcd, 0x00, - 0x7f, 0x2f, 0x2c, 0xf3, 0x5b, 0x07, 0xa6, 0xe7, 0x61, 0x27, 0x5a, 0x42, 0x16, 0x5c, 0x91, 0x80, - 0xf7, 0xa0, 0x3a, 0xf5, 0xc9, 0xb3, 0xc3, 0x58, 0xef, 0xe8, 0x57, 0xfb, 0xa5, 0x02, 0x7d, 0x99, - 0xec, 0xd3, 0x54, 0x84, 0xab, 0xd0, 0xf1, 0xb9, 0x72, 0xc6, 0x98, 0xcb, 0x63, 0xa8, 0x75, 0xbd, - 0x2d, 0xc8, 0x02, 0x05, 0x5d, 0x86, 0xb6, 0x8f, 0xe9, 0xcc, 0x99, 0xf3, 0x15, 0x19, 0x5f, 0x8b, - 0x53, 0x05, 0x9b, 0xf6, 0x5b, 0x05, 0xce, 0x6f, 0xe3, 0x20, 0xf6, 0x5e, 0x08, 0x87, 0x5f, 0xd3, - 0xea, 0xfa, 0x0b, 0x05, 0x3a, 0x19, 0x45, 0xd1, 0x1a, 0x34, 0x12, 0x3c, 0xc2, 0x41, 0x49, 0x12, - 0xfa, 0x36, 0x94, 0x43, 0xdb, 0x61, 0xa6, 0x52, 0x7b, 0x53, 0xdb, 0xc8, 0x37, 0xf7, 0x8d, 0xb4, - 0x54, 0x9d, 0x4f, 0x40, 0xd7, 0xe1, 0xac, 0xa4, 0xb2, 0x0a, 0xf5, 0x51, 0xbe, 0xb0, 0x6a, 0xbf, - 0x53, 0xa0, 0x2f, 0x33, 0xe6, 0x69, 0x1c, 0xfe, 0x00, 0xba, 0xf1, 0x6a, 0x0c, 0x0b, 0xd3, 0xb1, - 0x6f, 0x4f, 0x59, 0x9a, 0xb1, 0x66, 0xd0, 0xd8, 0xbc, 0x74, 0xfc, 0x7a, 0xa8, 0xbe, 0x1a, 0x8b, - 0x18, 0x24, 0x24, 0x68, 0x3f, 0x51, 0x60, 0x75, 0x1b, 0x07, 0x7b, 0x78, 0xe2, 0x62, 0x2f, 0x18, - 0x7a, 0xfb, 0xe4, 0xe4, 0x8e, 0xbf, 0x08, 0x40, 0x85, 0x9c, 0xb8, 0x51, 0x25, 0x28, 0xcb, 0x04, - 0x81, 0xf6, 0xa7, 0x22, 0x34, 0x12, 0xca, 0xa0, 0x37, 0xa1, 0x1e, 0x4b, 0x10, 0xae, 0x9d, 0x13, - 0x72, 0x12, 0x0b, 0x92, 0xb0, 0xca, 0x84, 0x47, 0x31, 0x1f, 0x1e, 0x0b, 0x2a, 0x38, 0x3a, 0x0f, - 0x35, 0x17, 0xbb, 0x06, 0xb5, 0x9f, 0x63, 0x51, 0x31, 0xaa, 0x2e, 0x76, 0xf7, 0xec, 0xe7, 0x38, - 0x1c, 0xf2, 0x66, 0xae, 0xe1, 0x93, 0xa7, 0xb4, 0x57, 0xe1, 0x43, 0xde, 0xcc, 0xd5, 0xc9, 0x53, - 0x8a, 0x2e, 0x00, 0xd8, 0x9e, 0x85, 0x9f, 0x19, 0x9e, 0xe9, 0xe2, 0x5e, 0x95, 0x65, 0x5c, 0x9d, - 0x51, 0x76, 0x4c, 0x17, 0x87, 0xb5, 0x82, 0xfd, 0x0c, 0x07, 0xbd, 0x1a, 0x9f, 0x28, 0x7e, 0xc3, - 0xa5, 0x8a, 0x3c, 0x1d, 0x0e, 0x7a, 0x75, 0x3e, 0x2f, 0x26, 0xa0, 0x4f, 0xa0, 0x25, 0xd6, 0x6d, - 0xf0, 0x58, 0x06, 0x16, 0xcb, 0x6b, 0x32, 0xdf, 0x0b, 0x03, 0xf2, 0x48, 0x6e, 0xd2, 0xc4, 0x1f, - 0xba, 0x02, 0xed, 0x31, 0x71, 0xa7, 0x26, 0xb3, 0xce, 0x6d, 0x9f, 0xb8, 0xbd, 0x06, 0xf3, 0x53, - 0x86, 0x8a, 0x6e, 0xc0, 0xd9, 0x31, 0xab, 0x5b, 0xd6, 0xad, 0xc3, 0xad, 0x78, 0xa8, 0xd7, 0x5c, - 0x53, 0xae, 0xd5, 0x74, 0xd9, 0x10, 0xdb, 0xd1, 0x66, 0x23, 0xe9, 0x34, 0x51, 0xff, 0x4d, 0x28, - 0xdb, 0xde, 0x3e, 0x89, 0x82, 0xfc, 0xad, 0x23, 0x16, 0xca, 0xc0, 0x38, 0xb7, 0xf6, 0xc7, 0x22, - 0x74, 0x3f, 0xb6, 0x2c, 0x59, 0x29, 0x7f, 0xf1, 0x88, 0x9e, 0x47, 0x46, 0x21, 0x15, 0x19, 0xcb, - 0x94, 0xb3, 0x77, 0xe1, 0x4c, 0xa6, 0x4c, 0x8b, 0x00, 0xab, 0xeb, 0x6a, 0xba, 0x50, 0x0f, 0x07, - 0xe8, 0x1d, 0x50, 0xd3, 0xa5, 0x5a, 0x34, 0xa9, 0xba, 0xde, 0x49, 0x15, 0xeb, 0xe1, 0x00, 0x7d, - 0x0b, 0xde, 0x98, 0x38, 0x64, 0x64, 0x3a, 0x06, 0xc5, 0xa6, 0x83, 0x2d, 0x63, 0x9e, 0x1f, 0x15, - 0xe6, 0xca, 0x55, 0x3e, 0xbc, 0xc7, 0x46, 0xf7, 0xe2, 0x5c, 0xd9, 0x0e, 0x03, 0x08, 0x3f, 0x32, - 0xa6, 0x84, 0xb2, 0xc0, 0x67, 0xa1, 0xd9, 0xc8, 0x16, 0xc3, 0xf8, 0x18, 0x73, 0x97, 0x4e, 0x76, - 0x05, 0x67, 0x18, 0x42, 0xf8, 0x51, 0xf4, 0x87, 0x3e, 0x87, 0xae, 0x54, 0x01, 0xda, 0xab, 0x2d, - 0xe7, 0xa9, 0x73, 0x12, 0x05, 0xa9, 0xf6, 0x77, 0x05, 0xce, 0xeb, 0xd8, 0x25, 0x4f, 0xf0, 0x57, - 0xd6, 0x77, 0xda, 0x3f, 0x0a, 0xd0, 0xfd, 0xbe, 0x19, 0x8c, 0x0f, 0x06, 0xae, 0x20, 0xd2, 0x57, - 0xb3, 0xc0, 0x4c, 0x51, 0x2c, 0xe5, 0x8b, 0x62, 0x9c, 0x7e, 0x65, 0x99, 0x53, 0xc3, 0xf3, 0xec, - 0xc6, 0x17, 0xd1, 0x7a, 0xe7, 0xe9, 0x97, 0xd8, 0x4d, 0x56, 0x4e, 0xb0, 0x9b, 0x44, 0x5b, 0xd0, - 0xc2, 0xcf, 0xc6, 0xce, 0xcc, 0xc2, 0x06, 0x47, 0xaf, 0x32, 0xf4, 0x8b, 0x12, 0xf4, 0x64, 0x44, - 0x35, 0xc5, 0xa4, 0x21, 0x2b, 0x01, 0x3f, 0x2e, 0x42, 0x47, 0x8c, 0x86, 0x1b, 0xf0, 0x25, 0xfa, - 0x48, 0xc6, 0x1c, 0x85, 0xbc, 0x39, 0x96, 0x31, 0x6a, 0xb4, 0xf1, 0x29, 0x25, 0x36, 0x3e, 0x17, - 0x00, 0xf6, 0x9d, 0x19, 0x3d, 0x30, 0x02, 0xdb, 0x8d, 0xba, 0x48, 0x9d, 0x51, 0xee, 0xdb, 0x2e, - 0x46, 0x1f, 0x43, 0x73, 0x64, 0x7b, 0x0e, 0x99, 0x18, 0x53, 0x33, 0x38, 0xa0, 0x2c, 0x83, 0xe5, - 0xcb, 0xbd, 0x6d, 0x63, 0xc7, 0xba, 0xc5, 0x78, 0xf5, 0x06, 0x9f, 0xb3, 0x1b, 0x4e, 0x41, 0x17, - 0xa1, 0x11, 0xb6, 0x22, 0xb2, 0xcf, 0xbb, 0x51, 0x95, 0x43, 0x78, 0x33, 0xf7, 0xde, 0x3e, 0xeb, - 0x47, 0xdf, 0x81, 0x7a, 0x58, 0x51, 0xa9, 0x43, 0x26, 0x51, 0x86, 0x1e, 0x27, 0x7f, 0x3e, 0x01, - 0x7d, 0x04, 0x75, 0x0b, 0x3b, 0x81, 0xc9, 0x66, 0xd7, 0x17, 0x86, 0xc2, 0x20, 0xe4, 0xb9, 0x43, - 0x26, 0xcc, 0x1b, 0xf3, 0x19, 0xda, 0xbf, 0x0b, 0x70, 0x36, 0xf4, 0x41, 0x94, 0xe5, 0x27, 0x8f, - 0xf6, 0x0b, 0x00, 0x16, 0x0d, 0x8c, 0x54, 0xc4, 0xd7, 0x2d, 0x1a, 0xec, 0xf0, 0xa0, 0xff, 0x30, - 0x0a, 0xd7, 0xe2, 0xe2, 0x2d, 0x51, 0x26, 0x26, 0xf2, 0x21, 0x7b, 0x92, 0x63, 0x28, 0xfa, 0x2e, - 0xb4, 0x1d, 0x62, 0x5a, 0xc6, 0x98, 0x78, 0x16, 0x2f, 0xac, 0x65, 0xd6, 0x99, 0xdf, 0x96, 0xa9, - 0x70, 0xdf, 0xb7, 0x27, 0x13, 0xec, 0x6f, 0x45, 0xbc, 0x7a, 0xcb, 0x61, 0x87, 0x70, 0xf1, 0x8b, - 0x2e, 0x41, 0x8b, 0x92, 0x99, 0x3f, 0xc6, 0xd1, 0x42, 0xf9, 0xe6, 0xa2, 0xc9, 0x89, 0x3b, 0xf2, - 0x04, 0xaf, 0x4a, 0xf6, 0x51, 0x7f, 0x53, 0xa0, 0x2b, 0x8e, 0x65, 0xa7, 0xb7, 0xfd, 0xa2, 0x4a, - 0x13, 0x05, 0x7c, 0xf1, 0x88, 0x9d, 0x7e, 0x69, 0x89, 0x9d, 0x7e, 0x59, 0x72, 0x58, 0x4b, 0x6f, - 0x26, 0x2b, 0xd9, 0xcd, 0xa4, 0x76, 0x1f, 0x5a, 0x71, 0x11, 0x65, 0x19, 0x7e, 0x09, 0x5a, 0x5c, - 0x2d, 0x23, 0x34, 0x29, 0xb6, 0xa2, 0x93, 0x1a, 0x27, 0xde, 0x61, 0xb4, 0x50, 0x6a, 0x5c, 0xa4, - 0xf9, 0xce, 0xa2, 0xae, 0x27, 0x28, 0xda, 0x1f, 0x0a, 0xa0, 0x26, 0xdb, 0x0f, 0x93, 0xbc, 0xcc, - 0x11, 0xf0, 0x2a, 0x74, 0xc4, 0x25, 0x62, 0xdc, 0x03, 0xc4, 0xa1, 0xec, 0x71, 0x52, 0xdc, 0x00, - 0x7d, 0x00, 0x5d, 0xce, 0x98, 0xeb, 0x19, 0xfc, 0x70, 0x76, 0x8e, 0x8d, 0xea, 0x99, 0xa6, 0xbf, - 0xb8, 0xe7, 0x96, 0x4e, 0xd1, 0x73, 0xf3, 0x7b, 0x82, 0xf2, 0xc9, 0xf6, 0x04, 0xda, 0x5f, 0x8b, - 0xd0, 0x9e, 0x67, 0xc8, 0xd2, 0x56, 0x5b, 0xe6, 0x72, 0x6b, 0x07, 0xd4, 0xf9, 0xe9, 0x87, 0x6d, - 0x7d, 0x8f, 0x4c, 0xf2, 0xec, 0xb9, 0xa7, 0x33, 0xcd, 0x1c, 0x17, 0x6f, 0x43, 0x4b, 0xd8, 0x5c, - 0xb4, 0x18, 0x6e, 0xc1, 0xaf, 0xcb, 0x84, 0xa5, 0x22, 0x4c, 0x6f, 0x26, 0xfa, 0x1d, 0x45, 0x1f, - 0x42, 0x9d, 0xe5, 0x7d, 0x70, 0x38, 0xc5, 0x22, 0xe5, 0xdf, 0x94, 0xc9, 0x08, 0x23, 0xef, 0xfe, - 0xe1, 0x14, 0xeb, 0x35, 0x47, 0x7c, 0x9d, 0xb6, 0x49, 0xde, 0x84, 0x55, 0x9f, 0xa7, 0xb6, 0x65, - 0xa4, 0xcc, 0x57, 0x65, 0xe6, 0x3b, 0x17, 0x0d, 0xee, 0x26, 0xcd, 0xb8, 0xe0, 0x24, 0x5b, 0x5b, - 0x78, 0x92, 0xfd, 0x59, 0x01, 0xba, 0xa1, 0xee, 0xb7, 0x4c, 0xc7, 0xf4, 0xc6, 0x78, 0xf9, 0x43, - 0xd9, 0x7f, 0xa7, 0x99, 0xe6, 0x2a, 0x61, 0x49, 0x52, 0x09, 0xd3, 0x4d, 0xa1, 0x9c, 0x6d, 0x0a, - 0x6f, 0x41, 0x43, 0xc8, 0xb0, 0x88, 0x87, 0x99, 0xb1, 0x6b, 0x3a, 0x70, 0xd2, 0x80, 0x78, 0xec, - 0x18, 0x17, 0xce, 0x67, 0xa3, 0x55, 0x36, 0x5a, 0xb5, 0x68, 0xc0, 0x86, 0x2e, 0x00, 0x3c, 0x31, - 0x1d, 0xdb, 0x62, 0x41, 0xc2, 0xcc, 0x54, 0xd3, 0xeb, 0x8c, 0x12, 0x9a, 0x40, 0xfb, 0xa9, 0x02, - 0xdd, 0x4f, 0x4d, 0xcf, 0x22, 0xfb, 0xfb, 0xa7, 0xaf, 0xaf, 0x5b, 0x10, 0x1d, 0xd2, 0x86, 0x2f, - 0x72, 0xe2, 0x49, 0x4d, 0xd2, 0xfe, 0xac, 0x00, 0x4a, 0xf8, 0xeb, 0xe4, 0xda, 0x5c, 0x86, 0x76, - 0xca, 0xf2, 0xf1, 0x1d, 0x7e, 0xd2, 0xf4, 0x34, 0xec, 0x7b, 0x23, 0x0e, 0x65, 0xf8, 0xd8, 0xa4, - 0xc4, 0x63, 0x6e, 0x5c, 0xba, 0xef, 0x8d, 0x22, 0x35, 0xc3, 0xa9, 0xda, 0xef, 0x0b, 0xd0, 0x4b, - 0xd7, 0xa6, 0x30, 0xf1, 0x26, 0x6c, 0x43, 0x77, 0x82, 0x25, 0x5c, 0x82, 0x16, 0xf1, 0x1c, 0xdb, - 0xc3, 0xe9, 0xfd, 0x42, 0x93, 0x13, 0x45, 0x74, 0x7c, 0x0a, 0x1d, 0xc1, 0x14, 0x17, 0xd3, 0xe2, - 0x72, 0x86, 0x6f, 0xf3, 0x79, 0x71, 0x19, 0xbd, 0x0c, 0x6d, 0xb2, 0xbf, 0x9f, 0xc4, 0xe3, 0xc1, - 0xda, 0x12, 0x54, 0x01, 0xf8, 0x19, 0xa8, 0x11, 0x5b, 0x8c, 0x58, 0x5e, 0x0e, 0xb1, 0x23, 0x26, - 0x46, 0x90, 0xeb, 0xcf, 0xa1, 0x9d, 0xae, 0x74, 0xa8, 0x09, 0xb5, 0x1d, 0x12, 0x7c, 0xf2, 0xcc, - 0xa6, 0x81, 0xba, 0x82, 0xda, 0x00, 0x3b, 0x24, 0xd8, 0xf5, 0x31, 0xc5, 0x5e, 0xa0, 0x2a, 0x08, - 0xa0, 0x72, 0xcf, 0x1b, 0xd8, 0xf4, 0x91, 0x5a, 0x40, 0x67, 0xc5, 0x1d, 0x9a, 0xe9, 0x0c, 0x45, - 0xda, 0xab, 0xc5, 0x70, 0x7a, 0xfc, 0x57, 0x42, 0x2a, 0x34, 0x63, 0x96, 0xed, 0xdd, 0xcf, 0xd5, - 0x32, 0xaa, 0x43, 0x99, 0x7f, 0x56, 0xd6, 0xef, 0x81, 0x9a, 0xf5, 0x27, 0x6a, 0x40, 0xf5, 0x80, - 0xa7, 0x83, 0xba, 0x82, 0x3a, 0xd0, 0x70, 0xe6, 0x91, 0xa8, 0x2a, 0x21, 0x61, 0xe2, 0x4f, 0xc7, - 0x22, 0x26, 0xd5, 0x42, 0x88, 0x16, 0x5a, 0x6a, 0x40, 0x9e, 0x7a, 0x6a, 0x71, 0xfd, 0x33, 0x68, - 0x26, 0xaf, 0x2c, 0x50, 0x0d, 0x4a, 0x3b, 0xc4, 0xc3, 0xea, 0x4a, 0x28, 0x76, 0xdb, 0x27, 0x4f, - 0x6d, 0x6f, 0xc2, 0xd7, 0x70, 0xdb, 0x27, 0xcf, 0xb1, 0xa7, 0x16, 0xc2, 0x81, 0xb0, 0x13, 0x86, - 0x03, 0xc5, 0x70, 0x80, 0xb7, 0x45, 0xb5, 0xb4, 0xfe, 0x3e, 0xd4, 0xa2, 0x8a, 0x8b, 0xce, 0x40, - 0x2b, 0x75, 0x03, 0xaf, 0xae, 0x20, 0xc4, 0x77, 0x6b, 0xf3, 0xda, 0xaa, 0x2a, 0x9b, 0xff, 0x02, - 0x00, 0xde, 0xf4, 0x09, 0xf1, 0x2d, 0x34, 0x05, 0xb4, 0x8d, 0x83, 0x2d, 0xe2, 0x4e, 0x89, 0x17, - 0xa9, 0x44, 0xd1, 0x8d, 0x05, 0x3d, 0x31, 0xcf, 0x2a, 0x56, 0xd9, 0xbf, 0xb2, 0x60, 0x46, 0x86, - 0x5d, 0x5b, 0x41, 0x2e, 0x43, 0x0c, 0x0f, 0x04, 0xf7, 0xed, 0xf1, 0xa3, 0xe8, 0xfa, 0xf6, 0x08, - 0xc4, 0x0c, 0x6b, 0x84, 0x98, 0x69, 0x88, 0xe2, 0x67, 0x2f, 0xf0, 0x6d, 0x6f, 0x12, 0x5d, 0xc6, - 0x68, 0x2b, 0xe8, 0x31, 0x9c, 0xdb, 0xc6, 0x0c, 0xdd, 0xa6, 0x81, 0x3d, 0xa6, 0x11, 0xe0, 0xe6, - 0x62, 0xc0, 0x1c, 0xf3, 0x0b, 0x42, 0x3a, 0xd0, 0xc9, 0x3c, 0x33, 0xa2, 0x75, 0x69, 0xcc, 0x4b, - 0x9f, 0x44, 0xfb, 0xef, 0x2e, 0xc5, 0x1b, 0xa3, 0xd9, 0xd0, 0x4e, 0x3f, 0xc1, 0xa1, 0x77, 0x16, - 0x09, 0xc8, 0xbd, 0x59, 0xf4, 0xd7, 0x97, 0x61, 0x8d, 0xa1, 0x1e, 0x40, 0x3b, 0xfd, 0xc8, 0x23, - 0x87, 0x92, 0x3e, 0x04, 0xf5, 0x8f, 0xba, 0x07, 0xd3, 0x56, 0xd0, 0x0f, 0xe1, 0x4c, 0xee, 0x65, - 0x05, 0x7d, 0x43, 0x26, 0x7e, 0xd1, 0x03, 0xcc, 0x71, 0x08, 0x42, 0xfb, 0xb9, 0x15, 0x17, 0x6b, - 0x9f, 0x7b, 0x62, 0x5b, 0x5e, 0xfb, 0x84, 0xf8, 0xa3, 0xb4, 0x7f, 0x61, 0x84, 0x19, 0xa0, 0xfc, - 0xdb, 0x0a, 0x7a, 0x4f, 0x06, 0xb1, 0xf0, 0x7d, 0xa7, 0xbf, 0xb1, 0x2c, 0x7b, 0xec, 0xf2, 0x19, - 0xcb, 0xd6, 0xec, 0x2b, 0x84, 0x14, 0x76, 0xe1, 0xb3, 0x8a, 0x1c, 0x76, 0xf1, 0xc3, 0x01, 0x0f, - 0xea, 0xf4, 0xf5, 0xaa, 0xdc, 0x57, 0xd2, 0xcb, 0x7c, 0x79, 0x50, 0xcb, 0x6f, 0x6b, 0xb5, 0x15, - 0x64, 0x00, 0x6c, 0xe3, 0xe0, 0x2e, 0x0e, 0x7c, 0x7b, 0x4c, 0xd1, 0x15, 0x69, 0x8a, 0xcf, 0x19, - 0x22, 0x8c, 0xab, 0xc7, 0xf2, 0x45, 0x00, 0x9b, 0x7f, 0xa9, 0x43, 0x9d, 0x59, 0x37, 0xec, 0x8c, - 0xff, 0x2f, 0xb8, 0x2f, 0xa1, 0xe0, 0x3e, 0x84, 0x4e, 0xe6, 0x16, 0x5c, 0x5e, 0x70, 0xe5, 0x57, - 0xe5, 0xc7, 0x65, 0xde, 0x08, 0x50, 0xfe, 0xaa, 0x56, 0x9e, 0x02, 0x0b, 0xaf, 0x74, 0x8f, 0xc3, - 0x78, 0x08, 0x9d, 0xcc, 0x55, 0xa9, 0x7c, 0x05, 0xf2, 0xfb, 0xd4, 0xe3, 0xa4, 0x7f, 0x01, 0xcd, - 0xe4, 0xbd, 0x14, 0xba, 0xba, 0xa8, 0xee, 0x65, 0x76, 0xf7, 0xaf, 0xbe, 0xea, 0xbd, 0xfc, 0xae, - 0xf0, 0x10, 0x3a, 0x99, 0xab, 0x23, 0xb9, 0xe5, 0xe5, 0xf7, 0x4b, 0xc7, 0x49, 0xff, 0x0a, 0xd5, - 0xb1, 0x5b, 0x1f, 0x3c, 0xd8, 0x9c, 0xd8, 0xc1, 0xc1, 0x6c, 0x14, 0xae, 0xf2, 0x3a, 0xe7, 0x7c, - 0xcf, 0x26, 0xe2, 0xeb, 0x7a, 0x94, 0xd0, 0xd7, 0x99, 0xa4, 0xeb, 0x4c, 0xdb, 0xe9, 0x68, 0x54, - 0x61, 0xbf, 0x37, 0xff, 0x13, 0x00, 0x00, 0xff, 0xff, 0x74, 0x83, 0x3d, 0x7e, 0x77, 0x26, 0x00, - 0x00, + // 2261 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x59, 0xcd, 0x6f, 0x1c, 0x49, + 0x15, 0x77, 0xcf, 0x87, 0x67, 0xe6, 0xcd, 0x57, 0xa7, 0x12, 0xcf, 0x4e, 0x86, 0x24, 0x6b, 0x3a, + 0x9b, 0x8f, 0xf5, 0xb2, 0x4e, 0xd6, 0x59, 0x10, 0x2b, 0xd8, 0xc3, 0xc6, 0xb3, 0xf1, 0xce, 0x92, + 0x38, 0xa6, 0x9d, 0x5d, 0x44, 0x14, 0xa9, 0xe9, 0x99, 0x2e, 0x8f, 0x5b, 0xe9, 0xee, 0x9a, 0x74, + 0xf5, 0x24, 0x71, 0xce, 0x1c, 0xe0, 0x80, 0xf8, 0x03, 0x40, 0x48, 0x48, 0x20, 0xc4, 0x81, 0x23, + 0x20, 0x71, 0xda, 0x0b, 0x77, 0xfe, 0x02, 0x24, 0x04, 0x77, 0x4e, 0x70, 0x46, 0x5d, 0x55, 0xdd, + 0xd3, 0x1f, 0x35, 0xf6, 0xd8, 0x26, 0x24, 0x5a, 0x71, 0xeb, 0x7e, 0xf5, 0xea, 0xbd, 0x57, 0xef, + 0xe3, 0xf7, 0xea, 0x03, 0xce, 0x3c, 0x99, 0x62, 0xff, 0xc0, 0x18, 0x11, 0xe2, 0x5b, 0xeb, 0x13, + 0x9f, 0x04, 0x04, 0x21, 0xd7, 0x76, 0x9e, 0x4e, 0x29, 0xff, 0x5b, 0x67, 0xe3, 0xbd, 0xc6, 0x88, + 0xb8, 0x2e, 0xf1, 0x38, 0xad, 0xd7, 0x48, 0x72, 0xf4, 0x5a, 0xb6, 0x17, 0x60, 0xdf, 0x33, 0x9d, + 0x68, 0x94, 0x8e, 0xf6, 0xb1, 0x6b, 0x8a, 0x3f, 0xd5, 0x32, 0x03, 0x33, 0x29, 0x5f, 0xfb, 0xa1, + 0x02, 0x9d, 0xdd, 0x7d, 0xf2, 0x6c, 0x93, 0x38, 0x0e, 0x1e, 0x05, 0x36, 0xf1, 0xa8, 0x8e, 0x9f, + 0x4c, 0x31, 0x0d, 0xd0, 0x4d, 0x28, 0x0d, 0x4d, 0x8a, 0xbb, 0xca, 0xaa, 0x72, 0xbd, 0xbe, 0x71, + 0x61, 0x3d, 0x65, 0x89, 0x30, 0xe1, 0x1e, 0x1d, 0xdf, 0x36, 0x29, 0xd6, 0x19, 0x27, 0x42, 0x50, + 0xb2, 0x86, 0x83, 0x7e, 0xb7, 0xb0, 0xaa, 0x5c, 0x2f, 0xea, 0xec, 0x1b, 0xbd, 0x05, 0xcd, 0x51, + 0x2c, 0x7b, 0xd0, 0xa7, 0xdd, 0xe2, 0x6a, 0xf1, 0x7a, 0x51, 0x4f, 0x13, 0xb5, 0xdf, 0x28, 0xf0, + 0x46, 0xce, 0x0c, 0x3a, 0x21, 0x1e, 0xc5, 0xe8, 0x16, 0x2c, 0xd3, 0xc0, 0x0c, 0xa6, 0x54, 0x58, + 0xf2, 0x15, 0xa9, 0x25, 0xbb, 0x8c, 0x45, 0x17, 0xac, 0x79, 0xb5, 0x05, 0x89, 0x5a, 0xf4, 0x1e, + 0x9c, 0xb3, 0xbd, 0x7b, 0xd8, 0x25, 0xfe, 0x81, 0x31, 0xc1, 0xfe, 0x08, 0x7b, 0x81, 0x39, 0xc6, + 0x91, 0x8d, 0x67, 0xa3, 0xb1, 0x9d, 0xd9, 0x90, 0xf6, 0x6b, 0x05, 0x56, 0x42, 0x4b, 0x77, 0x4c, + 0x3f, 0xb0, 0x5f, 0x82, 0xbf, 0x34, 0x68, 0x24, 0x6d, 0xec, 0x16, 0xd9, 0x58, 0x8a, 0x16, 0xf2, + 0x4c, 0x22, 0xf5, 0xe1, 0xda, 0x4a, 0xcc, 0xdc, 0x14, 0x4d, 0xfb, 0x95, 0x08, 0x6c, 0xd2, 0xce, + 0xd3, 0x38, 0x34, 0xab, 0xb3, 0x90, 0xd7, 0x79, 0x12, 0x77, 0x7e, 0xa1, 0xc0, 0xca, 0x5d, 0x62, + 0x5a, 0xb3, 0xc0, 0xff, 0xef, 0xdd, 0xf9, 0x21, 0x2c, 0xf3, 0x2a, 0xe9, 0x96, 0x98, 0xae, 0x2b, + 0x69, 0x5d, 0xa2, 0x82, 0x66, 0x16, 0xee, 0x32, 0x82, 0x2e, 0x26, 0x69, 0x3f, 0x57, 0xa0, 0xab, + 0x63, 0x07, 0x9b, 0x14, 0xbf, 0xca, 0x55, 0x74, 0x60, 0xd9, 0x23, 0x16, 0x1e, 0xf4, 0xd9, 0x2a, + 0x8a, 0xba, 0xf8, 0xd3, 0xfe, 0x21, 0x3c, 0xfc, 0x9a, 0x27, 0x6c, 0x22, 0x0a, 0xe5, 0x93, 0x44, + 0xe1, 0x8b, 0x59, 0x14, 0x5e, 0xf7, 0x95, 0xce, 0x22, 0x55, 0x4e, 0x45, 0xea, 0xfb, 0x70, 0x7e, + 0xd3, 0xc7, 0x66, 0x80, 0xbf, 0x1b, 0xc2, 0xfc, 0xe6, 0xbe, 0xe9, 0x79, 0xd8, 0x89, 0x96, 0x90, + 0x55, 0xae, 0x48, 0x94, 0x77, 0xa1, 0x32, 0xf1, 0xc9, 0xf3, 0x83, 0xd8, 0xee, 0xe8, 0x57, 0xfb, + 0xa5, 0x02, 0x3d, 0x99, 0xec, 0xd3, 0x20, 0xc2, 0x35, 0x68, 0xfb, 0xdc, 0x38, 0x63, 0xc4, 0xe5, + 0x31, 0xad, 0x35, 0xbd, 0x25, 0xc8, 0x42, 0x0b, 0xba, 0x02, 0x2d, 0x1f, 0xd3, 0xa9, 0x33, 0xe3, + 0x2b, 0x32, 0xbe, 0x26, 0xa7, 0x0a, 0x36, 0xed, 0xb7, 0x0a, 0x9c, 0xdf, 0xc2, 0x41, 0x1c, 0xbd, + 0x50, 0x1d, 0x7e, 0x4d, 0xd1, 0xf5, 0x17, 0x0a, 0xb4, 0x33, 0x86, 0xa2, 0x55, 0xa8, 0x27, 0x78, + 0x44, 0x80, 0x92, 0x24, 0xf4, 0x4d, 0x28, 0x87, 0xbe, 0xc3, 0xcc, 0xa4, 0xd6, 0x86, 0xb6, 0x9e, + 0x6f, 0xee, 0xeb, 0x69, 0xa9, 0x3a, 0x9f, 0x80, 0x6e, 0xc0, 0x59, 0x09, 0xb2, 0x0a, 0xf3, 0x51, + 0x1e, 0x58, 0xb5, 0xdf, 0x29, 0xd0, 0x93, 0x39, 0xf3, 0x34, 0x01, 0x7f, 0x08, 0x9d, 0x78, 0x35, + 0x86, 0x85, 0xe9, 0xc8, 0xb7, 0x27, 0xac, 0xcc, 0x58, 0x33, 0xa8, 0x6f, 0x5c, 0x3e, 0x7a, 0x3d, + 0x54, 0x5f, 0x89, 0x45, 0xf4, 0x13, 0x12, 0xb4, 0x9f, 0x28, 0xb0, 0xb2, 0x85, 0x83, 0x5d, 0x3c, + 0x76, 0xb1, 0x17, 0x0c, 0xbc, 0x3d, 0x72, 0xf2, 0xc0, 0x5f, 0x02, 0xa0, 0x42, 0x4e, 0xdc, 0xa8, + 0x12, 0x94, 0x45, 0x92, 0x40, 0xfb, 0x63, 0x11, 0xea, 0x09, 0x63, 0xd0, 0x05, 0xa8, 0xc5, 0x12, + 0x44, 0x68, 0x67, 0x84, 0x9c, 0xc4, 0x82, 0x24, 0xad, 0x32, 0xe9, 0x51, 0xcc, 0xa7, 0xc7, 0x1c, + 0x04, 0x47, 0xe7, 0xa1, 0xea, 0x62, 0xd7, 0xa0, 0xf6, 0x0b, 0x2c, 0x10, 0xa3, 0xe2, 0x62, 0x77, + 0xd7, 0x7e, 0x81, 0xc3, 0x21, 0x6f, 0xea, 0x1a, 0x3e, 0x79, 0x46, 0xbb, 0xcb, 0x7c, 0xc8, 0x9b, + 0xba, 0x3a, 0x79, 0x46, 0xd1, 0x45, 0x00, 0xdb, 0xb3, 0xf0, 0x73, 0xc3, 0x33, 0x5d, 0xdc, 0xad, + 0xb0, 0x8a, 0xab, 0x31, 0xca, 0xb6, 0xe9, 0xe2, 0x10, 0x2b, 0xd8, 0xcf, 0xa0, 0xdf, 0xad, 0xf2, + 0x89, 0xe2, 0x37, 0x5c, 0xaa, 0xa8, 0xd3, 0x41, 0xbf, 0x5b, 0xe3, 0xf3, 0x62, 0x02, 0xfa, 0x18, + 0x9a, 0x62, 0xdd, 0x06, 0xcf, 0x65, 0x60, 0xb9, 0xbc, 0x2a, 0x8b, 0xbd, 0x70, 0x20, 0xcf, 0xe4, + 0x06, 0x4d, 0xfc, 0xa1, 0xab, 0xd0, 0x1a, 0x11, 0x77, 0x62, 0x32, 0xef, 0xdc, 0xf1, 0x89, 0xdb, + 0xad, 0xb3, 0x38, 0x65, 0xa8, 0xe8, 0x26, 0x9c, 0x1d, 0x31, 0xdc, 0xb2, 0x6e, 0x1f, 0x6c, 0xc6, + 0x43, 0xdd, 0xc6, 0xaa, 0x72, 0xbd, 0xaa, 0xcb, 0x86, 0xd8, 0x8e, 0x36, 0x9b, 0x49, 0xa7, 0xc9, + 0xfa, 0xaf, 0x43, 0xd9, 0xf6, 0xf6, 0x48, 0x94, 0xe4, 0x6f, 0x1e, 0xb2, 0x50, 0xa6, 0x8c, 0x73, + 0x6b, 0x7f, 0x28, 0x42, 0xe7, 0x23, 0xcb, 0x92, 0x41, 0xf9, 0xf1, 0x33, 0x7a, 0x96, 0x19, 0x85, + 0x54, 0x66, 0x2c, 0x02, 0x67, 0xef, 0xc0, 0x99, 0x0c, 0x4c, 0x8b, 0x04, 0xab, 0xe9, 0x6a, 0x1a, + 0xa8, 0x07, 0x7d, 0xf4, 0x36, 0xa8, 0x69, 0xa8, 0x16, 0x4d, 0xaa, 0xa6, 0xb7, 0x53, 0x60, 0x3d, + 0xe8, 0xa3, 0x6f, 0xc0, 0x1b, 0x63, 0x87, 0x0c, 0x4d, 0xc7, 0xa0, 0xd8, 0x74, 0xb0, 0x65, 0xcc, + 0xea, 0x63, 0x99, 0x85, 0x72, 0x85, 0x0f, 0xef, 0xb2, 0xd1, 0xdd, 0xb8, 0x56, 0xb6, 0xc2, 0x04, + 0xc2, 0x8f, 0x8d, 0x09, 0xa1, 0x2c, 0xf1, 0x59, 0x6a, 0xd6, 0xb3, 0x60, 0x18, 0x1f, 0x63, 0xee, + 0xd1, 0xf1, 0x8e, 0xe0, 0x0c, 0x53, 0x08, 0x3f, 0x8e, 0xfe, 0xd0, 0x67, 0xd0, 0x91, 0x1a, 0x40, + 0xbb, 0xd5, 0xc5, 0x22, 0x75, 0x4e, 0x62, 0x20, 0xd5, 0xfe, 0xa6, 0xc0, 0x79, 0x1d, 0xbb, 0xe4, + 0x29, 0xfe, 0xd2, 0xc6, 0x4e, 0xfb, 0x7b, 0x01, 0x3a, 0xdf, 0x33, 0x83, 0xd1, 0x7e, 0xdf, 0x15, + 0x44, 0xfa, 0x6a, 0x16, 0x98, 0x01, 0xc5, 0x52, 0x1e, 0x14, 0xe3, 0xf2, 0x2b, 0xcb, 0x82, 0x1a, + 0x9e, 0x67, 0xd7, 0x3f, 0x8f, 0xd6, 0x3b, 0x2b, 0xbf, 0xc4, 0x6e, 0x72, 0xf9, 0x04, 0xbb, 0x49, + 0xb4, 0x09, 0x4d, 0xfc, 0x7c, 0xe4, 0x4c, 0x2d, 0x6c, 0x70, 0xed, 0x15, 0xa6, 0xfd, 0x92, 0x44, + 0x7b, 0x32, 0xa3, 0x1a, 0x62, 0xd2, 0x80, 0x41, 0xc0, 0x8f, 0x8a, 0xd0, 0x16, 0xa3, 0xe1, 0x06, + 0x7c, 0x81, 0x3e, 0x92, 0x71, 0x47, 0x21, 0xef, 0x8e, 0x45, 0x9c, 0x1a, 0x6d, 0x7c, 0x4a, 0x89, + 0x8d, 0xcf, 0x45, 0x80, 0x3d, 0x67, 0x4a, 0xf7, 0x8d, 0xc0, 0x76, 0xa3, 0x2e, 0x52, 0x63, 0x94, + 0x07, 0xb6, 0x8b, 0xd1, 0x47, 0xd0, 0x18, 0xda, 0x9e, 0x43, 0xc6, 0xc6, 0xc4, 0x0c, 0xf6, 0x29, + 0xab, 0x60, 0xf9, 0x72, 0xef, 0xd8, 0xd8, 0xb1, 0x6e, 0x33, 0x5e, 0xbd, 0xce, 0xe7, 0xec, 0x84, + 0x53, 0xd0, 0x25, 0xa8, 0x87, 0xad, 0x88, 0xec, 0xf1, 0x6e, 0x54, 0xe1, 0x2a, 0xbc, 0xa9, 0x7b, + 0x7f, 0x8f, 0xf5, 0xa3, 0x6f, 0x43, 0x2d, 0x44, 0x54, 0xea, 0x90, 0x71, 0x54, 0xa1, 0x47, 0xc9, + 0x9f, 0x4d, 0x40, 0x1f, 0x42, 0xcd, 0xc2, 0x4e, 0x60, 0xb2, 0xd9, 0xb5, 0xb9, 0xa9, 0xd0, 0x0f, + 0x79, 0xee, 0x92, 0x31, 0x8b, 0xc6, 0x6c, 0x86, 0xf6, 0xef, 0x02, 0x9c, 0x0d, 0x63, 0x10, 0x55, + 0xf9, 0xc9, 0xb3, 0xfd, 0x22, 0x80, 0x45, 0x03, 0x23, 0x95, 0xf1, 0x35, 0x8b, 0x06, 0xdb, 0x3c, + 0xe9, 0x3f, 0x88, 0xd2, 0xb5, 0x38, 0x7f, 0x4b, 0x94, 0xc9, 0x89, 0x7c, 0xca, 0x9e, 0xe4, 0x18, + 0x8a, 0xbe, 0x03, 0x2d, 0x87, 0x98, 0x96, 0x31, 0x22, 0x9e, 0xc5, 0x81, 0xb5, 0xcc, 0x3a, 0xf3, + 0x5b, 0x32, 0x13, 0x1e, 0xf8, 0xf6, 0x78, 0x8c, 0xfd, 0xcd, 0x88, 0x57, 0x6f, 0x3a, 0xec, 0x10, + 0x2e, 0x7e, 0xd1, 0x65, 0x68, 0x52, 0x32, 0xf5, 0x47, 0x38, 0x5a, 0x28, 0xdf, 0x5c, 0x34, 0x38, + 0x71, 0x5b, 0x5e, 0xe0, 0x15, 0xc9, 0x3e, 0xea, 0xaf, 0x0a, 0x74, 0xc4, 0xb1, 0xec, 0xf4, 0xbe, + 0x9f, 0x87, 0x34, 0x51, 0xc2, 0x17, 0x0f, 0xd9, 0xe9, 0x97, 0x16, 0xd8, 0xe9, 0x97, 0x25, 0x87, + 0xb5, 0xf4, 0x66, 0x72, 0x39, 0xbb, 0x99, 0xd4, 0x1e, 0x40, 0x33, 0x06, 0x51, 0x56, 0xe1, 0x97, + 0xa1, 0xc9, 0xcd, 0x32, 0x42, 0x97, 0x62, 0x2b, 0x3a, 0xa9, 0x71, 0xe2, 0x5d, 0x46, 0x0b, 0xa5, + 0xc6, 0x20, 0xcd, 0x77, 0x16, 0x35, 0x3d, 0x41, 0xd1, 0x7e, 0x5f, 0x00, 0x35, 0xd9, 0x7e, 0x98, + 0xe4, 0x45, 0x8e, 0x80, 0xd7, 0xa0, 0x2d, 0x2e, 0x11, 0xe3, 0x1e, 0x20, 0x0e, 0x65, 0x4f, 0x92, + 0xe2, 0xfa, 0xe8, 0x7d, 0xe8, 0x70, 0xc6, 0x5c, 0xcf, 0xe0, 0x87, 0xb3, 0x73, 0x6c, 0x54, 0xcf, + 0x34, 0xfd, 0xf9, 0x3d, 0xb7, 0x74, 0x8a, 0x9e, 0x9b, 0xdf, 0x13, 0x94, 0x4f, 0xb6, 0x27, 0xd0, + 0xfe, 0x52, 0x84, 0xd6, 0xac, 0x42, 0x16, 0xf6, 0xda, 0x22, 0x97, 0x5b, 0xdb, 0xa0, 0xce, 0x4e, + 0x3f, 0x6c, 0xeb, 0x7b, 0x68, 0x91, 0x67, 0xcf, 0x3d, 0xed, 0x49, 0xe6, 0xb8, 0x78, 0x07, 0x9a, + 0xc2, 0xe7, 0xa2, 0xc5, 0x70, 0x0f, 0x7e, 0x55, 0x26, 0x2c, 0x95, 0x61, 0x7a, 0x23, 0xd1, 0xef, + 0x28, 0xfa, 0x00, 0x6a, 0xac, 0xee, 0x83, 0x83, 0x09, 0x16, 0x25, 0x7f, 0x41, 0x26, 0x23, 0xcc, + 0xbc, 0x07, 0x07, 0x13, 0xac, 0x57, 0x1d, 0xf1, 0x75, 0xda, 0x26, 0x79, 0x0b, 0x56, 0x7c, 0x5e, + 0xda, 0x96, 0x91, 0x72, 0x5f, 0x85, 0xb9, 0xef, 0x5c, 0x34, 0xb8, 0x93, 0x74, 0xe3, 0x9c, 0x93, + 0x6c, 0x75, 0xee, 0x49, 0xf6, 0x67, 0x05, 0xe8, 0x84, 0xb6, 0xdf, 0x36, 0x1d, 0xd3, 0x1b, 0xe1, + 0xc5, 0x0f, 0x65, 0xff, 0x9d, 0x66, 0x9a, 0x43, 0xc2, 0x92, 0x04, 0x09, 0xd3, 0x4d, 0xa1, 0x9c, + 0x6d, 0x0a, 0x6f, 0x42, 0x5d, 0xc8, 0xb0, 0x88, 0x87, 0x99, 0xb3, 0xab, 0x3a, 0x70, 0x52, 0x9f, + 0x78, 0xec, 0x18, 0x17, 0xce, 0x67, 0xa3, 0x15, 0x36, 0x5a, 0xb1, 0x68, 0xc0, 0x86, 0x2e, 0x02, + 0x3c, 0x35, 0x1d, 0xdb, 0x62, 0x49, 0xc2, 0xdc, 0x54, 0xd5, 0x6b, 0x8c, 0x12, 0xba, 0x40, 0xfb, + 0xa9, 0x02, 0x9d, 0x4f, 0x4c, 0xcf, 0x22, 0x7b, 0x7b, 0xa7, 0xc7, 0xd7, 0x4d, 0x88, 0x0e, 0x69, + 0x83, 0xe3, 0x9c, 0x78, 0x52, 0x93, 0xb4, 0x3f, 0x29, 0x80, 0x12, 0xf1, 0x3a, 0xb9, 0x35, 0x57, + 0xa0, 0x95, 0xf2, 0x7c, 0x7c, 0x87, 0x9f, 0x74, 0x3d, 0x0d, 0xfb, 0xde, 0x90, 0xab, 0x32, 0x7c, + 0x6c, 0x52, 0xe2, 0xb1, 0x30, 0x2e, 0xdc, 0xf7, 0x86, 0x91, 0x99, 0xe1, 0x54, 0xed, 0x5f, 0x0a, + 0x9c, 0x11, 0x4b, 0x0b, 0x2b, 0x6e, 0x8c, 0x23, 0x48, 0x27, 0x9e, 0x63, 0x7b, 0x71, 0x0e, 0x08, + 0x0c, 0xe1, 0x44, 0x11, 0xe4, 0x4f, 0xa0, 0x2d, 0x98, 0x62, 0x4c, 0x5c, 0xd0, 0x7f, 0x2d, 0x3e, + 0x2f, 0x46, 0xc3, 0x2b, 0xd0, 0x22, 0x7b, 0x7b, 0x49, 0x7d, 0x3c, 0x31, 0x9b, 0x82, 0x2a, 0x14, + 0x7e, 0x0a, 0x6a, 0xc4, 0x76, 0x5c, 0x14, 0x6e, 0x8b, 0x89, 0xf1, 0xa1, 0xe7, 0xc7, 0x0a, 0x74, + 0xd3, 0x98, 0x9c, 0x58, 0xfe, 0xf1, 0x43, 0xf7, 0xad, 0xf4, 0x99, 0xf9, 0xca, 0x21, 0xf6, 0xcc, + 0xf4, 0x88, 0x7d, 0xd0, 0xda, 0x0b, 0x68, 0xa5, 0xc1, 0x13, 0x35, 0xa0, 0xba, 0x4d, 0x82, 0x8f, + 0x9f, 0xdb, 0x34, 0x50, 0x97, 0x50, 0x0b, 0x60, 0x9b, 0x04, 0x3b, 0x3e, 0xa6, 0xd8, 0x0b, 0x54, + 0x05, 0x01, 0x2c, 0xdf, 0xf7, 0xfa, 0x36, 0x7d, 0xac, 0x16, 0xd0, 0x59, 0x71, 0x2d, 0x67, 0x3a, + 0x03, 0x81, 0x24, 0x6a, 0x31, 0x9c, 0x1e, 0xff, 0x95, 0x90, 0x0a, 0x8d, 0x98, 0x65, 0x6b, 0xe7, + 0x33, 0xb5, 0x8c, 0x6a, 0x50, 0xe6, 0x9f, 0xcb, 0x6b, 0xf7, 0x41, 0xcd, 0xa6, 0x08, 0xaa, 0x43, + 0x65, 0x9f, 0x57, 0x98, 0xba, 0x84, 0xda, 0x50, 0x77, 0x66, 0xc9, 0xad, 0x2a, 0x21, 0x61, 0xec, + 0x4f, 0x46, 0x22, 0xcd, 0xd5, 0x42, 0xa8, 0x2d, 0x8c, 0x5a, 0x9f, 0x3c, 0xf3, 0xd4, 0xe2, 0xda, + 0xa7, 0xd0, 0x48, 0xde, 0x82, 0xa0, 0x2a, 0x94, 0xb6, 0x89, 0x87, 0xd5, 0xa5, 0x50, 0xec, 0x96, + 0x4f, 0x9e, 0xd9, 0xde, 0x98, 0xaf, 0xe1, 0x8e, 0x4f, 0x5e, 0x60, 0x4f, 0x2d, 0x84, 0x03, 0x61, + 0x73, 0x0d, 0x07, 0x8a, 0xe1, 0x00, 0xef, 0xb4, 0x6a, 0x69, 0xed, 0x3d, 0xa8, 0x46, 0x20, 0x8e, + 0xce, 0x40, 0x33, 0x75, 0xa9, 0xaf, 0x2e, 0x21, 0xc4, 0x37, 0x80, 0x33, 0xb8, 0x56, 0x95, 0x8d, + 0x7f, 0x02, 0x00, 0xdf, 0x47, 0x10, 0xe2, 0x5b, 0x68, 0x02, 0x68, 0x0b, 0x07, 0x9b, 0xc4, 0x9d, + 0x10, 0x2f, 0x32, 0x89, 0xa2, 0x9b, 0x73, 0xda, 0x6c, 0x9e, 0x55, 0xac, 0xb2, 0x77, 0x75, 0xce, + 0x8c, 0x0c, 0xbb, 0xb6, 0x84, 0x5c, 0xa6, 0x31, 0x3c, 0x63, 0x3c, 0xb0, 0x47, 0x8f, 0xa3, 0x1b, + 0xe1, 0x43, 0x34, 0x66, 0x58, 0x23, 0x8d, 0x99, 0x1e, 0x2b, 0x7e, 0x76, 0x03, 0xdf, 0xf6, 0xc6, + 0xd1, 0xfd, 0x8e, 0xb6, 0x84, 0x9e, 0xc0, 0xb9, 0x2d, 0xcc, 0xb4, 0xdb, 0x34, 0xb0, 0x47, 0x34, + 0x52, 0xb8, 0x31, 0x5f, 0x61, 0x8e, 0xf9, 0x98, 0x2a, 0x1d, 0x68, 0x67, 0x5e, 0x2e, 0xd1, 0x9a, + 0x34, 0xdf, 0xa5, 0xaf, 0xac, 0xbd, 0x77, 0x16, 0xe2, 0x8d, 0xb5, 0xd9, 0xd0, 0x4a, 0xbf, 0xea, + 0xa1, 0xb7, 0xe7, 0x09, 0xc8, 0x3d, 0x83, 0xf4, 0xd6, 0x16, 0x61, 0x8d, 0x55, 0x3d, 0x84, 0x56, + 0xfa, 0xdd, 0x48, 0xae, 0x4a, 0xfa, 0xb6, 0xd4, 0x3b, 0xec, 0x6a, 0x4d, 0x5b, 0x42, 0x3f, 0x80, + 0x33, 0xb9, 0xc7, 0x1a, 0xf4, 0x35, 0x99, 0xf8, 0x79, 0x6f, 0x3a, 0x47, 0x69, 0x10, 0xd6, 0xcf, + 0xbc, 0x38, 0xdf, 0xfa, 0xdc, 0xab, 0xdd, 0xe2, 0xd6, 0x27, 0xc4, 0x1f, 0x66, 0xfd, 0xb1, 0x35, + 0x4c, 0x01, 0xe5, 0x9f, 0x6b, 0xd0, 0xbb, 0x32, 0x15, 0x73, 0x9f, 0x8c, 0x7a, 0xeb, 0x8b, 0xb2, + 0xc7, 0x21, 0x9f, 0xb2, 0x6a, 0xcd, 0x3e, 0x6c, 0x48, 0xd5, 0xce, 0x7d, 0xa9, 0x91, 0xab, 0x9d, + 0xff, 0x16, 0xc1, 0x93, 0x3a, 0x7d, 0x63, 0x2b, 0x8f, 0x95, 0xf4, 0x7d, 0x40, 0x9e, 0xd4, 0xf2, + 0x0b, 0x60, 0x6d, 0x09, 0x19, 0x00, 0x5b, 0x38, 0xb8, 0x87, 0x03, 0xdf, 0x1e, 0x51, 0x74, 0x55, + 0x5a, 0xe2, 0x33, 0x86, 0x48, 0xc7, 0xb5, 0x23, 0xf9, 0x22, 0x05, 0x1b, 0x7f, 0xae, 0x41, 0x8d, + 0x79, 0x37, 0xec, 0xd2, 0xff, 0x07, 0xdc, 0x97, 0x00, 0xb8, 0x8f, 0xa0, 0x9d, 0xb9, 0x58, 0x97, + 0x03, 0xae, 0xfc, 0xf6, 0xfd, 0xa8, 0xca, 0x1b, 0x02, 0xca, 0xdf, 0xfe, 0xca, 0x4b, 0x60, 0xee, + 0x2d, 0xf1, 0x51, 0x3a, 0x1e, 0x41, 0x3b, 0x73, 0xfb, 0x2a, 0x5f, 0x81, 0xfc, 0x8a, 0xf6, 0x28, + 0xe9, 0x9f, 0x43, 0x23, 0x79, 0xd5, 0x85, 0xae, 0xcd, 0xc3, 0xbd, 0xcc, 0x81, 0xe1, 0xd5, 0xa3, + 0xde, 0xcb, 0xef, 0x0a, 0x8f, 0xa0, 0x9d, 0xb9, 0x8d, 0x92, 0x7b, 0x5e, 0x7e, 0x65, 0x75, 0x94, + 0xf4, 0x2f, 0x11, 0x8e, 0xdd, 0x7e, 0xff, 0xe1, 0xc6, 0xd8, 0x0e, 0xf6, 0xa7, 0xc3, 0x70, 0x95, + 0x37, 0x38, 0xe7, 0xbb, 0x36, 0x11, 0x5f, 0x37, 0xa2, 0x82, 0xbe, 0xc1, 0x24, 0xdd, 0x60, 0xd6, + 0x4e, 0x86, 0xc3, 0x65, 0xf6, 0x7b, 0xeb, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x61, 0x1b, 0x06, + 0x95, 0xca, 0x26, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 6c02ba4772894..ec2529685b476 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -40,7 +40,7 @@ const ( ) type col2SegmentInfos = map[UniqueID][]*querypb.SegmentInfo -type col2SealedSegmentChangeInfos = map[UniqueID][]*querypb.SealedSegmentsChangeInfo +type col2SealedSegmentChangeInfos = map[UniqueID]*querypb.SealedSegmentsChangeInfo // Meta contains information about all loaded collections and partitions, including segment information and vchannel information type Meta interface { @@ -79,7 +79,7 @@ type Meta interface { //printMeta() saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2SealedSegmentChangeInfos, error) removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error) - sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos []*querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) + sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) } // MetaReplica records the current load information on all querynodes @@ -369,15 +369,18 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal // generate segment change info according segment info to updated col2SegmentChangeInfos := make(col2SealedSegmentChangeInfos) - // get segmentInfos to save + // get segmentInfos to sav for collectionID, onlineInfos := range saves { + segmentsChangeInfo := &querypb.SealedSegmentsChangeInfo{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SealedSegmentsChangeInfo, + }, + Infos: []*querypb.SegmentChangeInfo{}, + } for _, info := range onlineInfos { segmentID := info.SegmentID onlineNodeID := info.NodeID - segmentChangeInfo := querypb.SealedSegmentsChangeInfo{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SealedSegmentsChangeInfo, - }, + changeInfo := &querypb.SegmentChangeInfo{ OnlineNodeID: onlineNodeID, OnlineSegments: []*querypb.SegmentInfo{info}, } @@ -386,16 +389,13 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal offlineNodeID := offlineInfo.NodeID // if the offline segment state is growing, it will not impact the global sealed segments if offlineInfo.SegmentState == querypb.SegmentState_sealed { - segmentChangeInfo.OfflineNodeID = offlineNodeID - segmentChangeInfo.OfflineSegments = []*querypb.SegmentInfo{offlineInfo} + changeInfo.OfflineNodeID = offlineNodeID + changeInfo.OfflineSegments = []*querypb.SegmentInfo{offlineInfo} } } - - if _, ok := col2SegmentChangeInfos[collectionID]; !ok { - col2SegmentChangeInfos[collectionID] = []*querypb.SealedSegmentsChangeInfo{} - } - col2SegmentChangeInfos[collectionID] = append(col2SegmentChangeInfos[collectionID], &segmentChangeInfo) + segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, changeInfo) } + col2SegmentChangeInfos[collectionID] = segmentsChangeInfo } queryChannelInfosMap := make(map[UniqueID]*querypb.QueryChannelInfo) @@ -405,12 +405,12 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal if err != nil { return nil, err } + // len(messageIDs) == 1 messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannelID] if !ok || len(messageIDs) == 0 { return col2SegmentChangeInfos, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed") } - seekMessageID := messageIDs[len(messageIDs)-1] - queryChannelInfo.SeekPosition.MsgID = seekMessageID.Serialize() + queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize() // update segmentInfo, queryChannelInfo meta to cache and etcd seg2Info := make(map[UniqueID]*querypb.SegmentInfo) @@ -458,15 +458,13 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal // avoid the produce process success but save meta to etcd failed // then the msgID key will not exist, and changeIndo will be ignored by query node for _, changeInfos := range col2SegmentChangeInfos { - for _, info := range changeInfos { - changeInfoBytes, err := proto.Marshal(info) - if err != nil { - return col2SegmentChangeInfos, err - } - // TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg - changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, info.Base.MsgID) - saveKvs[changeInfoKey] = string(changeInfoBytes) + changeInfoBytes, err := proto.Marshal(changeInfos) + if err != nil { + return col2SegmentChangeInfos, err } + // TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg + changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, changeInfos.Base.MsgID) + saveKvs[changeInfoKey] = string(changeInfoBytes) } err := m.client.MultiSave(saveKvs) @@ -493,23 +491,25 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal } func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error) { - segmentChangeInfos := make([]*querypb.SealedSegmentsChangeInfo, 0) removes := m.showSegmentInfos(collectionID, partitionIDs) if len(removes) == 0 { return nil, nil } // get segmentInfos to remove + segmentChangeInfos := &querypb.SealedSegmentsChangeInfo{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SealedSegmentsChangeInfo, + }, + Infos: []*querypb.SegmentChangeInfo{}, + } for _, info := range removes { offlineNodeID := info.NodeID - changeInfo := querypb.SealedSegmentsChangeInfo{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SealedSegmentsChangeInfo, - }, + changeInfo := &querypb.SegmentChangeInfo{ OfflineNodeID: offlineNodeID, OfflineSegments: []*querypb.SegmentInfo{info}, } - segmentChangeInfos = append(segmentChangeInfos, &changeInfo) + segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, changeInfo) } // get msgStream to produce sealedSegmentChangeInfos to query channel @@ -517,12 +517,12 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio if err != nil { return nil, err } + // len(messageIDs) = 1 messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannelID] if !ok || len(messageIDs) == 0 { return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed") } - seekMessageID := messageIDs[len(messageIDs)-1] - queryChannelInfo.SeekPosition.MsgID = seekMessageID.Serialize() + queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize() // update segmentInfo, queryChannelInfo meta to cache and etcd seg2Info := make(map[UniqueID]*querypb.SegmentInfo) @@ -554,15 +554,13 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio // save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd // avoid the produce process success but save meta to etcd failed // then the msgID key will not exist, and changeIndo will be ignored by query node - for _, info := range segmentChangeInfos { - changeInfoBytes, err := proto.Marshal(info) - if err != nil { - return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err - } - // TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg - changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, info.Base.MsgID) - saveKvs[changeInfoKey] = string(changeInfoBytes) + changeInfoBytes, err := proto.Marshal(segmentChangeInfos) + if err != nil { + return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err } + // TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg + changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, segmentChangeInfos.Base.MsgID) + saveKvs[changeInfoKey] = string(changeInfoBytes) removeKeys := make([]string, 0) for _, info := range removes { @@ -588,7 +586,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil } -func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos []*querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) { +func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) { // get msgStream to produce sealedSegmentChangeInfos to query channel queryChannelInfo, err := m.getQueryChannelInfoByID(collectionID) if err != nil { @@ -605,21 +603,19 @@ func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, change var msgPack = &msgstream.MsgPack{ Msgs: []msgstream.TsMsg{}, } - for _, changeInfo := range changeInfos { - id, err := m.idAllocator() - if err != nil { - log.Error("allocator trigger taskID failed", zap.Error(err)) - return nil, nil, err - } - changeInfo.Base.MsgID = id - segmentChangeMsg := &msgstream.SealedSegmentsChangeInfoMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{0}, - }, - SealedSegmentsChangeInfo: *changeInfo, - } - msgPack.Msgs = append(msgPack.Msgs, segmentChangeMsg) + id, err := m.idAllocator() + if err != nil { + log.Error("allocator trigger taskID failed", zap.Error(err)) + return nil, nil, err + } + changeInfos.Base.MsgID = id + segmentChangeMsg := &msgstream.SealedSegmentsChangeInfoMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{0}, + }, + SealedSegmentsChangeInfo: *changeInfos, } + msgPack.Msgs = append(msgPack.Msgs, segmentChangeMsg) messageIDInfos, err := queryStream.ProduceMark(msgPack) if err != nil { diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 8cdbbe0ee7bf2..32998e5fba629 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -931,22 +931,25 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta) return nil } -func reverseSealedSegmentChangeInfo(changeInfosMap map[UniqueID][]*querypb.SealedSegmentsChangeInfo) map[UniqueID][]*querypb.SealedSegmentsChangeInfo { - result := make(map[UniqueID][]*querypb.SealedSegmentsChangeInfo) +func reverseSealedSegmentChangeInfo(changeInfosMap map[UniqueID]*querypb.SealedSegmentsChangeInfo) map[UniqueID]*querypb.SealedSegmentsChangeInfo { + result := make(map[UniqueID]*querypb.SealedSegmentsChangeInfo) for collectionID, changeInfos := range changeInfosMap { - result[collectionID] = []*querypb.SealedSegmentsChangeInfo{} - for _, info := range changeInfos { - segmentChangeInfo := &querypb.SealedSegmentsChangeInfo{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SealedSegmentsChangeInfo, - }, + segmentChangeInfos := &querypb.SealedSegmentsChangeInfo{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SealedSegmentsChangeInfo, + }, + Infos: []*querypb.SegmentChangeInfo{}, + } + for _, info := range changeInfos.Infos { + changeInfo := &querypb.SegmentChangeInfo{ OnlineNodeID: info.OfflineNodeID, OnlineSegments: info.OfflineSegments, OfflineNodeID: info.OnlineNodeID, OfflineSegments: info.OnlineSegments, } - result[collectionID] = append(result[collectionID], segmentChangeInfo) + segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, changeInfo) } + result[collectionID] = segmentChangeInfos } return result diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index be78ef0fd2ed6..c5fe018514392 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -1254,8 +1254,7 @@ func consumeSimpleRetrieveResult(stream msgstream.MsgStream) (*msgstream.Retriev } func genSimpleChangeInfo() *querypb.SealedSegmentsChangeInfo { - return &querypb.SealedSegmentsChangeInfo{ - Base: genCommonMsgBase(commonpb.MsgType_LoadBalanceSegments), + changeInfo := &querypb.SegmentChangeInfo{ OnlineNodeID: Params.QueryNodeID, OnlineSegments: []*querypb.SegmentInfo{ genSimpleSegmentInfo(), @@ -1265,6 +1264,11 @@ func genSimpleChangeInfo() *querypb.SealedSegmentsChangeInfo { genSimpleSegmentInfo(), }, } + + return &querypb.SealedSegmentsChangeInfo{ + Base: genCommonMsgBase(commonpb.MsgType_LoadBalanceSegments), + Infos: []*querypb.SegmentChangeInfo{changeInfo}, + } } func saveChangeInfo(key string, value string) error { diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 4627279b20583..a91d17f02b893 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -305,35 +305,37 @@ func (q *queryCollection) consumeQuery() { } func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChangeInfoMsg) error { - // for OnlineSegments: - for _, segment := range msg.OnlineSegments { - // 1. update global sealed segments - err := q.globalSegmentManager.addGlobalSegmentInfo(segment) - if err != nil { - return err - } - // 2. update excluded segment, cluster have been loaded sealed segments, - // so we need to avoid getting growing segment from flow graph. - q.streaming.replica.addExcludedSegments(segment.CollectionID, []*datapb.SegmentInfo{ - { - ID: segment.SegmentID, - CollectionID: segment.CollectionID, - PartitionID: segment.PartitionID, - InsertChannel: segment.ChannelID, - NumOfRows: segment.NumRows, - // TODO: add status, remove query pb segment status, use common pb segment status? - DmlPosition: &internalpb.MsgPosition{ - // use max timestamp to filter out dm messages - Timestamp: math.MaxInt64, + for _, info := range msg.Infos { + // for OnlineSegments: + for _, segment := range info.OnlineSegments { + // 1. update global sealed segments + err := q.globalSegmentManager.addGlobalSegmentInfo(segment) + if err != nil { + return err + } + // 2. update excluded segment, cluster have been loaded sealed segments, + // so we need to avoid getting growing segment from flow graph. + q.streaming.replica.addExcludedSegments(segment.CollectionID, []*datapb.SegmentInfo{ + { + ID: segment.SegmentID, + CollectionID: segment.CollectionID, + PartitionID: segment.PartitionID, + InsertChannel: segment.ChannelID, + NumOfRows: segment.NumRows, + // TODO: add status, remove query pb segment status, use common pb segment status? + DmlPosition: &internalpb.MsgPosition{ + // use max timestamp to filter out dm messages + Timestamp: typeutil.MaxTimestamp, + }, }, - }, - }) - } + }) + } - // for OfflineSegments: - for _, segment := range msg.OfflineSegments { - // update global sealed segments - q.globalSegmentManager.removeGlobalSegmentInfo(segment.SegmentID) + // for OfflineSegments: + for _, segment := range info.OfflineSegments { + // 1. update global sealed segments + q.globalSegmentManager.removeGlobalSegmentInfo(segment.SegmentID) + } } return nil } diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index e4bede5d23600..788b8e8c2a70a 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -70,13 +70,16 @@ func genSimpleSegmentInfo() *querypb.SegmentInfo { } func genSimpleSealedSegmentsChangeInfo() *querypb.SealedSegmentsChangeInfo { - return &querypb.SealedSegmentsChangeInfo{ - Base: genCommonMsgBase(commonpb.MsgType_SealedSegmentsChangeInfo), + changeInfo := &querypb.SegmentChangeInfo{ OnlineNodeID: Params.QueryNodeID, OnlineSegments: []*querypb.SegmentInfo{}, OfflineNodeID: Params.QueryNodeID, OfflineSegments: []*querypb.SegmentInfo{}, } + return &querypb.SealedSegmentsChangeInfo{ + Base: genCommonMsgBase(commonpb.MsgType_SealedSegmentsChangeInfo), + Infos: []*querypb.SegmentChangeInfo{changeInfo}, + } } func genSimpleSealedSegmentsChangeInfoMsg() *msgstream.SealedSegmentsChangeInfoMsg { @@ -287,7 +290,7 @@ func TestQueryCollection_consumeQuery(t *testing.T) { msg := genSimpleSealedSegmentsChangeInfoMsg() simpleInfo := genSimpleSegmentInfo() simpleInfo.CollectionID = 1000 - msg.OnlineSegments = append(msg.OnlineSegments, simpleInfo) + msg.Infos[0].OnlineSegments = append(msg.Infos[0].OnlineSegments, simpleInfo) runConsumeQuery(msg) }) @@ -624,19 +627,19 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) { qc, err := genSimpleQueryCollection(ctx, cancel) assert.Nil(t, err) - info := genSimpleSealedSegmentsChangeInfoMsg() + segmentChangeInfos := genSimpleSealedSegmentsChangeInfoMsg() // test online - info.OnlineSegments = append(info.OnlineSegments, genSimpleSegmentInfo()) - err = qc.adjustByChangeInfo(info) + segmentChangeInfos.Infos[0].OnlineSegments = append(segmentChangeInfos.Infos[0].OnlineSegments, genSimpleSegmentInfo()) + err = qc.adjustByChangeInfo(segmentChangeInfos) assert.NoError(t, err) ids := qc.globalSegmentManager.getGlobalSegmentIDs() assert.Len(t, ids, 1) // test offline - info.OnlineSegments = make([]*querypb.SegmentInfo, 0) - info.OfflineSegments = append(info.OfflineSegments, genSimpleSegmentInfo()) - err = qc.adjustByChangeInfo(info) + segmentChangeInfos.Infos[0].OnlineSegments = make([]*querypb.SegmentInfo, 0) + segmentChangeInfos.Infos[0].OfflineSegments = append(segmentChangeInfos.Infos[0].OfflineSegments, genSimpleSegmentInfo()) + err = qc.adjustByChangeInfo(segmentChangeInfos) assert.NoError(t, err) ids = qc.globalSegmentManager.getGlobalSegmentIDs() assert.Len(t, ids, 0) @@ -646,13 +649,13 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) { qc, err := genSimpleQueryCollection(ctx, cancel) assert.Nil(t, err) - info := genSimpleSealedSegmentsChangeInfoMsg() + segmentChangeInfos := genSimpleSealedSegmentsChangeInfoMsg() // test online simpleInfo := genSimpleSegmentInfo() simpleInfo.CollectionID = 1000 - info.OnlineSegments = append(info.OnlineSegments, simpleInfo) - err = qc.adjustByChangeInfo(info) + segmentChangeInfos.Infos[0].OnlineSegments = append(segmentChangeInfos.Infos[0].OnlineSegments, simpleInfo) + err = qc.adjustByChangeInfo(segmentChangeInfos) assert.Error(t, err) }) @@ -663,10 +666,10 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) { err = qc.historical.replica.removeSegment(defaultSegmentID) assert.NoError(t, err) - info := genSimpleSealedSegmentsChangeInfoMsg() - info.OfflineSegments = append(info.OfflineSegments, genSimpleSegmentInfo()) + segmentChangeInfos := genSimpleSealedSegmentsChangeInfoMsg() + segmentChangeInfos.Infos[0].OfflineSegments = append(segmentChangeInfos.Infos[0].OfflineSegments, genSimpleSegmentInfo()) - err = qc.adjustByChangeInfo(info) + err = qc.adjustByChangeInfo(segmentChangeInfos) assert.Nil(t, err) }) } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 13cbefd1ecf6b..828cd91b653f9 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -306,77 +306,83 @@ func (node *QueryNode) watchChangeInfo() { } } -func (node *QueryNode) waitChangeInfo(info *querypb.SealedSegmentsChangeInfo) error { +func (node *QueryNode) waitChangeInfo(segmentChangeInfos *querypb.SealedSegmentsChangeInfo) error { fn := func() error { - canDoLoadBalance := true - // Check online segments: - for _, segmentInfo := range info.OnlineSegments { - if node.queryService.hasQueryCollection(segmentInfo.CollectionID) { - qc, err := node.queryService.getQueryCollection(segmentInfo.CollectionID) - if err != nil { - canDoLoadBalance = false - break - } - if info.OnlineNodeID == Params.QueryNodeID && !qc.globalSegmentManager.hasGlobalSegment(segmentInfo.SegmentID) { - canDoLoadBalance = false - break + for _, info := range segmentChangeInfos.Infos { + canDoLoadBalance := true + // Check online segments: + for _, segmentInfo := range info.OnlineSegments { + if node.queryService.hasQueryCollection(segmentInfo.CollectionID) { + qc, err := node.queryService.getQueryCollection(segmentInfo.CollectionID) + if err != nil { + canDoLoadBalance = false + break + } + if info.OnlineNodeID == Params.QueryNodeID && !qc.globalSegmentManager.hasGlobalSegment(segmentInfo.SegmentID) { + canDoLoadBalance = false + break + } } } - } - // Check offline segments: - for _, segmentInfo := range info.OfflineSegments { - if node.queryService.hasQueryCollection(segmentInfo.CollectionID) { - qc, err := node.queryService.getQueryCollection(segmentInfo.CollectionID) - if err != nil { - canDoLoadBalance = false - break - } - if info.OfflineNodeID == Params.QueryNodeID && qc.globalSegmentManager.hasGlobalSegment(segmentInfo.SegmentID) { - canDoLoadBalance = false - break + // Check offline segments: + for _, segmentInfo := range info.OfflineSegments { + if node.queryService.hasQueryCollection(segmentInfo.CollectionID) { + qc, err := node.queryService.getQueryCollection(segmentInfo.CollectionID) + if err != nil { + canDoLoadBalance = false + break + } + if info.OfflineNodeID == Params.QueryNodeID && qc.globalSegmentManager.hasGlobalSegment(segmentInfo.SegmentID) { + canDoLoadBalance = false + break + } } } + if canDoLoadBalance { + return nil + } + return errors.New(fmt.Sprintln("waitChangeInfo failed, infoID = ", segmentChangeInfos.Base.GetMsgID())) } - if canDoLoadBalance { - return nil - } - return errors.New(fmt.Sprintln("waitChangeInfo failed, infoID = ", info.Base.GetMsgID())) + + return nil } return retry.Do(context.TODO(), fn, retry.Attempts(10)) } -func (node *QueryNode) adjustByChangeInfo(info *querypb.SealedSegmentsChangeInfo) error { - err := node.waitChangeInfo(info) +func (node *QueryNode) adjustByChangeInfo(segmentChangeInfos *querypb.SealedSegmentsChangeInfo) error { + err := node.waitChangeInfo(segmentChangeInfos) if err != nil { log.Error("waitChangeInfo failed", zap.Any("error", err.Error())) return err } - // For online segments: - for _, segmentInfo := range info.OnlineSegments { - // delete growing segment because these segments are loaded in historical. - hasGrowingSegment := node.streaming.replica.hasSegment(segmentInfo.SegmentID) - if hasGrowingSegment { - err := node.streaming.replica.removeSegment(segmentInfo.SegmentID) - if err != nil { - return err + for _, info := range segmentChangeInfos.Infos { + // For online segments: + for _, segmentInfo := range info.OnlineSegments { + // delete growing segment because these segments are loaded in historical. + hasGrowingSegment := node.streaming.replica.hasSegment(segmentInfo.SegmentID) + if hasGrowingSegment { + err := node.streaming.replica.removeSegment(segmentInfo.SegmentID) + if err != nil { + return err + } + log.Debug("remove growing segment in adjustByChangeInfo", + zap.Any("collectionID", segmentInfo.CollectionID), + zap.Any("segmentID", segmentInfo.SegmentID), + zap.Any("infoID", segmentChangeInfos.Base.GetMsgID()), + ) } - log.Debug("remove growing segment in adjustByChangeInfo", - zap.Any("collectionID", segmentInfo.CollectionID), - zap.Any("segmentID", segmentInfo.SegmentID), - zap.Any("infoID", info.Base.GetMsgID()), - ) } - } - // For offline segments: - for _, segment := range info.OfflineSegments { - // load balance or compaction, remove old sealed segments. - if info.OfflineNodeID == Params.QueryNodeID { - err := node.historical.replica.removeSegment(segment.SegmentID) - if err != nil { - return err + // For offline segments: + for _, segment := range info.OfflineSegments { + // load balance or compaction, remove old sealed segments. + if info.OfflineNodeID == Params.QueryNodeID { + err := node.historical.replica.removeSegment(segment.SegmentID) + if err != nil { + return err + } } } } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 9a099b0cf6224..b92889c82f8d5 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -327,15 +327,15 @@ func TestQueryNode_adjustByChangeInfo(t *testing.T) { err = node.historical.replica.removeSegment(defaultSegmentID) assert.NoError(t, err) - info := genSimpleChangeInfo() - info.OnlineSegments = nil - info.OfflineNodeID = Params.QueryNodeID + segmentChangeInfos := genSimpleChangeInfo() + segmentChangeInfos.Infos[0].OnlineSegments = nil + segmentChangeInfos.Infos[0].OfflineNodeID = Params.QueryNodeID qc, err := node.queryService.getQueryCollection(defaultCollectionID) assert.NoError(t, err) qc.globalSegmentManager.removeGlobalSegmentInfo(defaultSegmentID) - err = node.adjustByChangeInfo(info) + err = node.adjustByChangeInfo(segmentChangeInfos) assert.Error(t, err) }) } @@ -390,9 +390,9 @@ func TestQueryNode_watchChangeInfo(t *testing.T) { err = node.historical.replica.removeSegment(defaultSegmentID) assert.NoError(t, err) - info := genSimpleChangeInfo() - info.OnlineSegments = nil - info.OfflineNodeID = Params.QueryNodeID + segmentChangeInfos := genSimpleChangeInfo() + segmentChangeInfos.Infos[0].OnlineSegments = nil + segmentChangeInfos.Infos[0].OfflineNodeID = Params.QueryNodeID qc, err := node.queryService.getQueryCollection(defaultCollectionID) assert.NoError(t, err) @@ -400,7 +400,7 @@ func TestQueryNode_watchChangeInfo(t *testing.T) { go node.watchChangeInfo() - value, err := proto.Marshal(info) + value, err := proto.Marshal(segmentChangeInfos) assert.NoError(t, err) err = saveChangeInfo("0", string(value)) assert.NoError(t, err)