diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 3734cd07e6a3d..0d1e6e271d1e7 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -2220,7 +2220,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")) tr.handleSignal(&compactionSignal{ segmentID: 1, @@ -2237,7 +2237,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ Properties: map[string]string{ common.CollectionAutoCompactionKey: "bad_value", diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 1a0f532576f3c..4d58bef892cf6 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -24,6 +24,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -271,77 +272,73 @@ func (m *meta) GetIndexIDByName(collID int64, indexName string) map[int64]uint64 return indexID2CreateTs } -type IndexState struct { - state commonpb.IndexState - failReason string -} - -func (m *meta) GetSegmentIndexState(collID, segmentID UniqueID) IndexState { +func (m *meta) GetSegmentIndexState(collID, segmentID UniqueID, indexID UniqueID) *indexpb.SegmentIndexState { m.RLock() defer m.RUnlock() - state := IndexState{ - state: commonpb.IndexState_IndexStateNone, - failReason: "", + state := &indexpb.SegmentIndexState{ + SegmentID: segmentID, + State: commonpb.IndexState_IndexStateNone, + FailReason: "", } fieldIndexes, ok := m.indexes[collID] if !ok { - state.failReason = fmt.Sprintf("collection not exist with ID: %d", collID) + state.FailReason = fmt.Sprintf("collection not exist with ID: %d", collID) return state } segment := m.segments.GetSegment(segmentID) - if segment != nil { - for indexID, index := range fieldIndexes { - if !index.IsDeleted { - if segIdx, ok := segment.segmentIndexes[indexID]; ok { - if segIdx.IndexState != commonpb.IndexState_Finished { - state.state = segIdx.IndexState - state.failReason = segIdx.FailReason - break - } - state.state = commonpb.IndexState_Finished - continue - } - state.state = commonpb.IndexState_Unissued - break - } + if segment == nil { + state.FailReason = fmt.Sprintf("segment is not exist with ID: %d", segmentID) + return state + } + + if index, ok := fieldIndexes[indexID]; ok && !index.IsDeleted { + if segIdx, ok := segment.segmentIndexes[indexID]; ok { + state.IndexName = index.IndexName + state.State = segIdx.IndexState + state.FailReason = segIdx.FailReason + return state } + state.State = commonpb.IndexState_Unissued return state } - state.failReason = fmt.Sprintf("segment is not exist with ID: %d", segmentID) + + state.FailReason = fmt.Sprintf("there is no index on indexID: %d", indexID) return state } -func (m *meta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID) IndexState { +func (m *meta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID) *indexpb.SegmentIndexState { m.RLock() defer m.RUnlock() - state := IndexState{ - state: commonpb.IndexState_IndexStateNone, - failReason: "", + state := &indexpb.SegmentIndexState{ + SegmentID: segmentID, + State: commonpb.IndexState_IndexStateNone, + FailReason: "", } fieldIndexes, ok := m.indexes[collID] if !ok { - state.failReason = fmt.Sprintf("collection not exist with ID: %d", collID) + state.FailReason = fmt.Sprintf("collection not exist with ID: %d", collID) return state } segment := m.segments.GetSegment(segmentID) - if segment != nil { - for indexID, index := range fieldIndexes { - if index.FieldID == fieldID && !index.IsDeleted { - if segIdx, ok := segment.segmentIndexes[indexID]; ok { - state.state = segIdx.IndexState - state.failReason = segIdx.FailReason - return state - } - state.state = commonpb.IndexState_Unissued + if segment == nil { + state.FailReason = fmt.Sprintf("segment is not exist with ID: %d", segmentID) + return state + } + for indexID, index := range fieldIndexes { + if index.FieldID == fieldID && !index.IsDeleted { + if segIdx, ok := segment.segmentIndexes[indexID]; ok { + state.IndexName = index.IndexName + state.State = segIdx.IndexState + state.FailReason = segIdx.FailReason return state } + state.State = commonpb.IndexState_Unissued + return state } - state.failReason = fmt.Sprintf("there is no index on fieldID: %d", fieldID) - return state } - state.failReason = fmt.Sprintf("segment is not exist with ID: %d", segmentID) + state.FailReason = fmt.Sprintf("there is no index on fieldID: %d", fieldID) return state } @@ -716,7 +713,7 @@ func (m *meta) GetHasUnindexTaskSegments() []*SegmentInfo { m.RLock() defer m.RUnlock() segments := m.segments.GetSegments() - var ret []*SegmentInfo + unindexedSegments := make(map[int64]*SegmentInfo) for _, segment := range segments { if !isFlush(segment) { continue @@ -724,12 +721,13 @@ func (m *meta) GetHasUnindexTaskSegments() []*SegmentInfo { if fieldIndexes, ok := m.indexes[segment.CollectionID]; ok { for _, index := range fieldIndexes { if _, ok := segment.segmentIndexes[index.IndexID]; !index.IsDeleted && !ok { - ret = append(ret, segment) + unindexedSegments[segment.GetID()] = segment } } } } - return ret + + return lo.MapToSlice(unindexedSegments, func(_ int64, segment *SegmentInfo) *SegmentInfo { return segment }) } func (m *meta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex { diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 6c33f53705a36..798ab3a79bc19 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -475,9 +475,10 @@ func TestMeta_GetSegmentIndexState(t *testing.T) { }, } - t.Run("segment has no index", func(t *testing.T) { - state := m.GetSegmentIndexState(collID, segID) - assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) + t.Run("collection has no index", func(t *testing.T) { + state := m.GetSegmentIndexState(collID, segID, indexID) + assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState()) + assert.Contains(t, state.GetFailReason(), "collection not exist with ID") }) t.Run("meta not saved yet", func(t *testing.T) { @@ -496,13 +497,14 @@ func TestMeta_GetSegmentIndexState(t *testing.T) { UserIndexParams: indexParams, }, } - state := m.GetSegmentIndexState(collID, segID) - assert.Equal(t, commonpb.IndexState_Unissued, state.state) + state := m.GetSegmentIndexState(collID, segID, indexID) + assert.Equal(t, commonpb.IndexState_Unissued, state.GetState()) }) t.Run("segment not exist", func(t *testing.T) { - state := m.GetSegmentIndexState(collID, segID+1) - assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) + state := m.GetSegmentIndexState(collID, segID+1, indexID) + assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState()) + assert.Contains(t, state.FailReason, "segment is not exist with ID") }) t.Run("unissued", func(t *testing.T) { @@ -523,8 +525,8 @@ func TestMeta_GetSegmentIndexState(t *testing.T) { IndexSize: 0, }) - state := m.GetSegmentIndexState(collID, segID) - assert.Equal(t, commonpb.IndexState_Unissued, state.state) + state := m.GetSegmentIndexState(collID, segID, indexID) + assert.Equal(t, commonpb.IndexState_Unissued, state.GetState()) }) t.Run("finish", func(t *testing.T) { @@ -545,8 +547,8 @@ func TestMeta_GetSegmentIndexState(t *testing.T) { IndexSize: 0, }) - state := m.GetSegmentIndexState(collID, segID) - assert.Equal(t, commonpb.IndexState_Finished, state.state) + state := m.GetSegmentIndexState(collID, segID, indexID) + assert.Equal(t, commonpb.IndexState_Finished, state.GetState()) }) } @@ -643,22 +645,22 @@ func TestMeta_GetSegmentIndexStateOnField(t *testing.T) { t.Run("success", func(t *testing.T) { state := m.GetSegmentIndexStateOnField(collID, segID, fieldID) - assert.Equal(t, commonpb.IndexState_Finished, state.state) + assert.Equal(t, commonpb.IndexState_Finished, state.GetState()) }) t.Run("no index on field", func(t *testing.T) { state := m.GetSegmentIndexStateOnField(collID, segID, fieldID+1) - assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) + assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState()) }) t.Run("no index", func(t *testing.T) { state := m.GetSegmentIndexStateOnField(collID+1, segID, fieldID+1) - assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) + assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState()) }) t.Run("segment not exist", func(t *testing.T) { state := m.GetSegmentIndexStateOnField(collID, segID+1, fieldID) - assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state) + assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState()) }) } @@ -1230,6 +1232,19 @@ func TestMeta_GetHasUnindexTaskSegments(t *testing.T) { IsAutoIndex: false, UserIndexParams: nil, }, + indexID + 1: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: indexName + "_1", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + }, }, }, } @@ -1239,6 +1254,33 @@ func TestMeta_GetHasUnindexTaskSegments(t *testing.T) { assert.Equal(t, 1, len(segments)) assert.Equal(t, segID, segments[0].ID) }) + + t.Run("segment partial field with index", func(t *testing.T) { + m.segments.segments[segID].segmentIndexes = map[UniqueID]*model.SegmentIndex{ + indexID: { + CollectionID: collID, + SegmentID: segID, + IndexID: indexID, + IndexState: commonpb.IndexState_Finished, + }, + } + + segments := m.GetHasUnindexTaskSegments() + assert.Equal(t, 1, len(segments)) + assert.Equal(t, segID, segments[0].ID) + }) + + t.Run("segment all vector field with index", func(t *testing.T) { + m.segments.segments[segID].segmentIndexes[indexID+1] = &model.SegmentIndex{ + CollectionID: collID, + SegmentID: segID, + IndexID: indexID + 1, + IndexState: commonpb.IndexState_Finished, + } + + segments := m.GetHasUnindexTaskSegments() + assert.Equal(t, 0, len(segments)) + }) } // see also: https://github.com/milvus-io/milvus/issues/21660 diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index c5f3995ef5626..d849f9906161d 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -337,7 +337,7 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme ) log.Info("receive GetSegmentIndexState", zap.String("IndexName", req.GetIndexName()), - zap.Int64s("fieldID", req.GetSegmentIDs()), + zap.Int64s("segmentIDs", req.GetSegmentIDs()), ) if err := merr.CheckHealthy(s.GetStateCode()); err != nil { @@ -360,12 +360,10 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme }, nil } for _, segID := range req.GetSegmentIDs() { - state := s.meta.GetSegmentIndexState(req.GetCollectionID(), segID) - ret.States = append(ret.States, &indexpb.SegmentIndexState{ - SegmentID: segID, - State: state.state, - FailReason: state.failReason, - }) + for indexID := range indexID2CreateTs { + state := s.meta.GetSegmentIndexState(req.GetCollectionID(), segID, indexID) + ret.States = append(ret.States, state) + } } log.Info("GetSegmentIndexState successfully", zap.String("indexName", req.GetIndexName())) return ret, nil diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 8eac0e71f8e0d..59d0fef4c8ffb 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -755,7 +755,7 @@ func TestServer_GetIndexState(t *testing.T) { }}, } - t.Run("index state is node", func(t *testing.T) { + t.Run("index state is none", func(t *testing.T) { resp, err := s.GetIndexState(ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) @@ -766,7 +766,6 @@ func TestServer_GetIndexState(t *testing.T) { s.meta.indexes[collID][indexID+1] = &model.Index{ TenantID: "", CollectionID: collID, - FieldID: fieldID, IndexID: indexID + 1, IndexName: "default_idx_1", IsDeleted: false, @@ -1833,7 +1832,7 @@ func TestServer_DropIndex(t *testing.T) { indexID + 3: { TenantID: "", CollectionID: collID, - FieldID: fieldID + 3, + FieldID: fieldID, IndexID: indexID + 3, IndexName: indexName + "_3", IsDeleted: false, @@ -1847,7 +1846,7 @@ func TestServer_DropIndex(t *testing.T) { indexID + 4: { TenantID: "", CollectionID: collID, - FieldID: fieldID + 4, + FieldID: fieldID, IndexID: indexID + 4, IndexName: indexName + "_4", IsDeleted: false, diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 3bacf49b97906..68ad20329de70 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -25,12 +25,12 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // Response response interface for verification @@ -71,9 +71,8 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo segmentMap := make(map[int64]*SegmentInfo) collectionSegments := make(map[int64][]int64) - // TODO(yah01): This can't handle the case of multiple vector fields exist, - // modify it if we support multiple vector fields. - vecFieldID := make(map[int64]int64) + + vecFieldIDs := make(map[int64][]int64) for _, segment := range segments { collectionID := segment.GetCollectionID() segmentMap[segment.GetID()] = segment @@ -88,11 +87,8 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo continue } for _, field := range coll.Schema.GetFields() { - if field.GetDataType() == schemapb.DataType_BinaryVector || - field.GetDataType() == schemapb.DataType_FloatVector || - field.GetDataType() == schemapb.DataType_Float16Vector { - vecFieldID[collection] = field.GetFieldID() - break + if typeutil.IsVectorType(field.GetDataType()) { + vecFieldIDs[collection] = append(vecFieldIDs[collection], field.GetFieldID()) } } } @@ -102,8 +98,15 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped { continue } - segmentState := mt.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), vecFieldID[segment.GetCollectionID()]) - if segmentState.state == commonpb.IndexState_Finished { + + hasUnindexedVecField := false + for _, fieldID := range vecFieldIDs[segment.GetCollectionID()] { + segmentIndexState := mt.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), fieldID) + if segmentIndexState.State != commonpb.IndexState_Finished { + hasUnindexedVecField = true + } + } + if !hasUnindexedVecField { indexedSegments = append(indexedSegments, segment) } } diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 0d0fc1348dd00..2dce6870b4b3a 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -139,6 +139,7 @@ message SegmentIndexState { int64 segmentID = 1; common.IndexState state = 2; string fail_reason = 3; + string index_name = 4; } message GetSegmentIndexStateResponse { diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index bbe049fefac61..bbe299702bfc3 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -1210,6 +1210,21 @@ func TestProxy(t *testing.T) { assert.NoError(t, err) }) + wg.Add(1) + t.Run("describe index with indexName", func(t *testing.T) { + defer wg.Done() + resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + FieldName: floatVecField, + IndexName: indexName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + indexName = resp.IndexDescriptions[0].IndexName + }) + wg.Add(1) t.Run("get index statistics", func(t *testing.T) { defer wg.Done()