Skip to content

Commit

Permalink
enhance: enable manual compaction for collections without indexes (mi…
Browse files Browse the repository at this point in the history
…lvus-io#36581)

issue: milvus-io#36576
pr: milvus-io#36577

Signed-off-by: jaime <yun.zhang@zilliz.com>
  • Loading branch information
jaime0815 authored Oct 8, 2024
1 parent bf67178 commit da2d3fb
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 20 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_policy_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
views := make([]CompactionView, 0)
for _, group := range partSegments {
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
group.segments = FilterInIndexedSegments(policy.handler, policy.meta, group.segments...)
group.segments = FilterInIndexedSegments(policy.handler, policy.meta, false, group.segments...)
}

collectionTTL, err := getCollectionTTL(collection.Properties)
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
}

if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
group.segments = FilterInIndexedSegments(t.handler, t.meta, group.segments...)
group.segments = FilterInIndexedSegments(t.handler, t.meta, signal.isForce, group.segments...)
}

coll, err := t.getCollection(group.collectionID)
Expand Down Expand Up @@ -684,7 +684,7 @@ func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int)
func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo {
segments := t.meta.GetSegmentsByChannel(channel)
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
segments = FilterInIndexedSegments(t.handler, t.meta, segments...)
segments = FilterInIndexedSegments(t.handler, t.meta, false, segments...)
}

var res []*SegmentInfo
Expand Down
98 changes: 97 additions & 1 deletion internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,99 @@ func newMockVersionManager() IndexEngineVersionManager {

var _ compactionPlanContext = (*spyCompactionHandler)(nil)

func Test_compactionTrigger_force_without_index(t *testing.T) {
catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe()

collectionID := int64(11)
binlogs := []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
}
deltaLogs := []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
}

schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 101,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
}

m := &meta{
catalog: catalog,
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: collectionID,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltaLogs,
},
},
},
},
indexMeta: &indexMeta{
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
indexes: map[UniqueID]map[UniqueID]*model.Index{},
},
collections: map[int64]*collectionInfo{
collectionID: {
ID: collectionID,
Schema: schema,
},
},
}

compactionHandler := &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 1), meta: m}
tr := &compactionTrigger{
meta: m,
handler: newMockHandlerWithMeta(m),
allocator: &MockAllocator0{},
signals: nil,
compactionHandler: compactionHandler,
globalTrigger: nil,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}

_, err := tr.triggerManualCompaction(collectionID)
assert.NoError(t, err)

select {
case val := <-compactionHandler.spyChan:
assert.Equal(t, 1, len(val.SegmentBinlogs))
return
case <-time.After(3 * time.Second):
assert.Fail(t, "failed to get plan")
return
}
}

func Test_compactionTrigger_force(t *testing.T) {
paramtable.Init()
type fields struct {
Expand Down Expand Up @@ -830,6 +923,9 @@ func Test_compactionTrigger_noplan(t *testing.T) {
}
Params.DataCoordCfg.MinSegmentToMerge.DefaultValue = "4"
vecFieldID := int64(201)
im := newSegmentIndexMeta(nil)
im.indexes[2] = make(map[UniqueID]*model.Index)

tests := []struct {
name string
fields fields
Expand All @@ -841,7 +937,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
"test no plan",
fields{
&meta{
indexMeta: newSegmentIndexMeta(nil),
indexMeta: im,
// 4 segment
channelCPs: newChannelCps(),

Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
droppedCompactTo[to] = struct{}{}
}
}
indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, lo.Keys(droppedCompactTo)...)
indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, false, lo.Keys(droppedCompactTo)...)
indexedSet := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexedSet.Insert(segment.GetID())
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName())

segmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, false, segments...)
indexed := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexed.Insert(segment.GetID())
Expand Down
14 changes: 14 additions & 0 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,3 +965,17 @@ func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.Collect
allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex)
return allDiskIndex
}

func (m *indexMeta) HasIndex(collectionID int64) bool {
m.RLock()
defer m.RUnlock()
indexes, ok := m.indexes[collectionID]
if ok {
for _, index := range indexes {
if !index.IsDeleted {
return true
}
}
}
return false
}
8 changes: 7 additions & 1 deletion internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func VerifyResponse(response interface{}, err error) error {
}
}

func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo) []*SegmentInfo {
func FilterInIndexedSegments(handler Handler, mt *meta, skipNoIndexCollection bool, segments ...*SegmentInfo) []*SegmentInfo {
if len(segments) == 0 {
return nil
}
Expand All @@ -82,6 +82,12 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo

ret := make([]*SegmentInfo, 0)
for collection, segmentList := range collectionSegments {
// No segments will be filtered if there are no indices in the collection.
if skipNoIndexCollection && !mt.indexMeta.HasIndex(collection) {
ret = append(ret, segmentList...)
continue
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
coll, err := handler.GetCollection(ctx, collection)
cancel()
Expand Down
36 changes: 23 additions & 13 deletions tests/integration/compaction/mix_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)

Expand Down Expand Up @@ -71,6 +72,17 @@ func (s *CompactionSuite) TestMixCompaction() {
s.NoError(err)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))

// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType),
})
err = merr.CheckRPCCall(createIndexStatus, err)
s.NoError(err)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)

// show collection
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
err = merr.CheckRPCCall(showCollectionsResp, err)
Expand Down Expand Up @@ -110,21 +122,14 @@ func (s *CompactionSuite) TestMixCompaction() {
log.Info("insert done", zap.Int("i", i))
}

// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType),
})
err = merr.CheckRPCCall(createIndexStatus, err)
s.NoError(err)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)

segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
s.Equal(rowNum/batch, len(segments))

// The stats task of segments will create a new segment, potentially triggering compaction simultaneously,
// which may lead to an increase or decrease in the number of segments.
s.True(len(segments) > 0)

for _, segment := range segments {
log.Info("show segment result", zap.String("segment", segment.String()))
}
Expand All @@ -134,16 +139,21 @@ func (s *CompactionSuite) TestMixCompaction() {
segments, err = c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)

compactFromSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetState() == commonpb.SegmentState_Dropped
})
compactToSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetState() == commonpb.SegmentState_Flushed
})

log.Info("ShowSegments result", zap.Int("len(compactFromSegments)", len(compactFromSegments)),
zap.Int("len(compactToSegments)", len(compactToSegments)))
return len(compactToSegments) == 1

// The small segments can be merged based on dataCoord.compaction.min.segment
return len(compactToSegments) <= paramtable.Get().DataCoordCfg.MinSegmentToMerge.GetAsInt()
}

for !showSegments() {
select {
case <-ctx.Done():
Expand Down

0 comments on commit da2d3fb

Please sign in to comment.