Skip to content

Commit

Permalink
Support return pending index rows when describe index (milvus-io#24588)
Browse files Browse the repository at this point in the history
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 authored Jun 1, 2023
1 parent b0ce4cf commit 93ea9c4
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 159 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/protobuf v1.5.3
github.com/klauspost/compress v1.14.4
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230531124827-410c849303a9
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/minio/minio-go/v7 v7.0.17
github.com/panjf2000/ants/v2 v2.7.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723 h1:VWwQdHN1JuM/Q+9QK1bOyTpEqPHTdAKw5qOK0Lgua/c=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230531124827-410c849303a9 h1:l4UDSKK29zXAg5+oqa4eAZaAfRHsyFsij3QPxu1tqvk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230531124827-410c849303a9/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
91 changes: 77 additions & 14 deletions internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// serverID return the session serverID
Expand Down Expand Up @@ -324,19 +325,67 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
return ret, nil
}

func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*SegmentInfo) int64 {
unIndexed, indexed := typeutil.NewSet[int64](), typeutil.NewSet[int64]()
for _, seg := range segments {
segIdx, ok := seg.segmentIndexes[indexInfo.IndexID]
if !ok {
unIndexed.Insert(seg.GetID())
continue
}
switch segIdx.IndexState {
case commonpb.IndexState_Finished:
indexed.Insert(seg.GetID())
default:
unIndexed.Insert(seg.GetID())
}
}
retrieveContinue := len(unIndexed) != 0
for retrieveContinue {
for segID := range unIndexed {
unIndexed.Remove(segID)
segment := s.meta.GetSegment(segID)
if segment == nil || len(segment.CompactionFrom) == 0 {
continue
}
for _, fromID := range segment.CompactionFrom {
fromSeg := s.meta.GetSegment(fromID)
if fromSeg == nil {
continue
}
if segIndex, ok := fromSeg.segmentIndexes[indexInfo.IndexID]; ok && segIndex.IndexState == commonpb.IndexState_Finished {
indexed.Insert(fromID)
continue
}
unIndexed.Insert(fromID)
}
}
retrieveContinue = len(unIndexed) != 0
}
indexedRows := int64(0)
for segID := range indexed {
segment := s.meta.GetSegment(segID)
if segment != nil {
indexedRows += segment.GetNumOfRows()
}
}
return indexedRows
}

// completeIndexInfo get the index row count and index task state
// if realTime, calculate current statistics
// if not realTime, which means get info of the prior `CreateIndex` action, skip segments created after index's create time
func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments []*SegmentInfo, realTime bool) {
var (
cntNone = 0
cntUnissued = 0
cntInProgress = 0
cntFinished = 0
cntFailed = 0
failReason string
totalRows = int64(0)
indexedRows = int64(0)
cntNone = 0
cntUnissued = 0
cntInProgress = 0
cntFinished = 0
cntFailed = 0
failReason string
totalRows = int64(0)
indexedRows = int64(0)
pendingIndexRows = int64(0)
)

for _, seg := range segments {
Expand All @@ -347,8 +396,12 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
if seg.GetStartPosition().GetTimestamp() <= index.CreateTime {
cntUnissued++
}
pendingIndexRows += seg.GetNumOfRows()
continue
}
if segIdx.IndexState != commonpb.IndexState_Finished {
pendingIndexRows += seg.GetNumOfRows()
}

// if realTime, calculate current statistics
// if not realTime, skip segments created after index create
Expand All @@ -374,8 +427,13 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
}
}

if realTime {
indexInfo.IndexedRows = indexedRows
} else {
indexInfo.IndexedRows = s.countIndexedRows(indexInfo, segments)
}
indexInfo.TotalRows = totalRows
indexInfo.IndexedRows = indexedRows
indexInfo.PendingIndexRows = pendingIndexRows
switch {
case cntFailed > 0:
indexInfo.State = commonpb.IndexState_Failed
Expand All @@ -390,6 +448,7 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In

log.Info("completeIndexInfo success", zap.Int64("collID", index.CollectionID), zap.Int64("indexID", index.IndexID),
zap.Int64("totalRows", indexInfo.TotalRows), zap.Int64("indexRows", indexInfo.IndexedRows),
zap.Int64("pendingIndexRows", indexInfo.PendingIndexRows),
zap.String("state", indexInfo.State.String()), zap.String("failReason", indexInfo.IndexStateFailReason))
}

Expand Down Expand Up @@ -434,9 +493,12 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
}, nil
}
indexInfo := &indexpb.IndexInfo{
IndexedRows: 0,
TotalRows: 0,
State: 0,
CollectionID: req.CollectionID,
IndexID: indexes[0].IndexID,
IndexedRows: 0,
TotalRows: 0,
PendingIndexRows: 0,
State: 0,
}
s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
Expand All @@ -447,8 +509,9 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexedRows: indexInfo.IndexedRows,
TotalRows: indexInfo.TotalRows,
IndexedRows: indexInfo.IndexedRows,
TotalRows: indexInfo.TotalRows,
PendingIndexRows: indexInfo.PendingIndexRows,
}, nil
}

Expand Down
81 changes: 80 additions & 1 deletion internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func TestServer_DescribeIndex(t *testing.T) {
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
ID: invalidSegID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
Expand All @@ -815,6 +815,8 @@ func TestServer_DescribeIndex(t *testing.T) {
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
CreatedByCompaction: true,
CompactionFrom: []int64{segID - 1},
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
Expand Down Expand Up @@ -904,6 +906,83 @@ func TestServer_DescribeIndex(t *testing.T) {
},
},
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID - 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
CreateTime: createTS,
},
indexID + 1: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 1,
BuildID: buildID + 1,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
CreateTime: createTS,
},
indexID + 3: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 3,
BuildID: buildID + 3,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
CreateTime: createTS,
},
indexID + 4: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 4,
BuildID: buildID + 4,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Failed,
FailReason: "mock failed",
CreateTime: createTS,
},
indexID + 5: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 5,
BuildID: buildID + 5,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
CreateTime: createTS,
},
},
},
}},
},
allocator: newMockAllocator(),
Expand Down
2 changes: 2 additions & 0 deletions internal/proto/index_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ message IndexInfo {
string index_state_fail_reason = 10;
bool is_auto_index = 11;
repeated common.KeyValuePair user_index_params = 12;
int64 pending_index_rows = 13;
}

message FieldIndex {
Expand Down Expand Up @@ -191,6 +192,7 @@ message GetIndexBuildProgressResponse {
common.Status status = 1;
int64 indexed_rows = 2;
int64 total_rows = 3;
int64 pending_index_rows = 4;
}

message StorageConfig {
Expand Down
Loading

0 comments on commit 93ea9c4

Please sign in to comment.