Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix collection is compacting logic #34855

Merged
merged 2 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 22 additions & 33 deletions internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@
)

type clusteringCompactionPolicy struct {
meta *meta
view *FullViews
allocator allocator
compactionHandler compactionPlanContext
handler Handler
meta *meta
allocator allocator
handler Handler
}

func newClusteringCompactionPolicy(meta *meta, view *FullViews, allocator allocator, compactionHandler compactionPlanContext, handler Handler) *clusteringCompactionPolicy {
return &clusteringCompactionPolicy{meta: meta, view: view, allocator: allocator, compactionHandler: compactionHandler, handler: handler}
func newClusteringCompactionPolicy(meta *meta, allocator allocator, handler Handler) *clusteringCompactionPolicy {
return &clusteringCompactionPolicy{meta: meta, allocator: allocator, handler: handler}
}

func (policy *clusteringCompactionPolicy) Enable() bool {
Expand All @@ -56,26 +54,22 @@
log.Info("start trigger clusteringCompactionPolicy...")
ctx := context.Background()
collections := policy.meta.GetCollections()
ts, err := policy.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("allocate ts failed, skip to handle compaction")
return make(map[CompactionTriggerType][]CompactionView, 0), err
}

events := make(map[CompactionTriggerType][]CompactionView, 0)
views := make([]CompactionView, 0)
for _, collection := range collections {
collectionViews, _, err := policy.triggerOneCollection(ctx, collection.ID, ts, false)
collectionViews, _, err := policy.triggerOneCollection(ctx, collection.ID, false)

Check warning on line 61 in internal/datacoord/compaction_policy_clustering.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_policy_clustering.go#L61

Added line #L61 was not covered by tests
if err != nil {
log.Warn("fail to trigger collection clustering compaction", zap.Int64("collectionID", collection.ID))
return make(map[CompactionTriggerType][]CompactionView, 0), err
// not throw this error because no need to fail because of one collection
log.Warn("fail to trigger collection clustering compaction", zap.Int64("collectionID", collection.ID), zap.Error(err))

Check warning on line 64 in internal/datacoord/compaction_policy_clustering.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_policy_clustering.go#L64

Added line #L64 was not covered by tests
}
views = append(views, collectionViews...)
}
events[TriggerTypeClustering] = views
return events, nil
}

// todo: remove this check after support partial clustering compaction
func (policy *clusteringCompactionPolicy) checkAllL2SegmentsContains(ctx context.Context, collectionID, partitionID int64, channel string) bool {
getCompactingL2Segment := func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID &&
Expand All @@ -97,30 +91,24 @@
return true
}

func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Context, collectionID int64, ts Timestamp, manual bool) ([]CompactionView, int64, error) {
func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Context, collectionID int64, manual bool) ([]CompactionView, int64, error) {
log := log.With(zap.Int64("collectionID", collectionID))
log.Info("trigger collection clustering compaction")
log.Info("start trigger collection clustering compaction")
collection, err := policy.handler.GetCollection(ctx, collectionID)
if err != nil {
log.Warn("fail to get collection")
log.Warn("fail to get collection from handler")
return nil, 0, err
}
if collection == nil {
log.Warn("collection not exist")
return nil, 0, nil
}
clusteringKeyField := clustering.GetClusteringKeyField(collection.Schema)
if clusteringKeyField == nil {
log.Info("the collection has no clustering key, skip tigger clustering compaction")
return nil, 0, nil
}

// if not pass, alloc a new one
if ts == 0 {
tsNew, err := policy.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("allocate ts failed, skip to handle compaction")
return nil, 0, err
}
ts = tsNew
}

compacting, triggerID := policy.collectionIsClusteringCompacting(collection.ID)
if compacting {
log.Info("collection is clustering compacting", zap.Int64("triggerID", triggerID))
Expand All @@ -146,14 +134,15 @@
// partSegments is list of chanPartSegments, which is channel-partition organized segments
for _, group := range partSegments {
log := log.With(zap.Int64("partitionID", group.partitionID), zap.String("channel", group.channelName))

if !policy.checkAllL2SegmentsContains(ctx, group.collectionID, group.partitionID, group.channelName) {
log.Warn("clustering compaction cannot be done, otherwise the performance will fall back")
continue
}

ct, err := getCompactTime(ts, collection)
collectionTTL, err := getCollectionTTL(collection.Properties)
if err != nil {
log.Warn("get compact time failed, skip to handle compaction")
log.Warn("get collection ttl failed, skip to handle compaction")

Check warning on line 145 in internal/datacoord/compaction_policy_clustering.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_policy_clustering.go#L145

Added line #L145 was not covered by tests
return make([]CompactionView, 0), 0, err
}

Expand All @@ -178,7 +167,7 @@
label: segmentViews[0].label,
segments: segmentViews,
clusteringKeyField: clusteringKeyField,
compactionTime: ct,
collectionTTL: collectionTTL,
triggerID: newTriggerID,
}
views = append(views, view)
Expand All @@ -195,7 +184,7 @@
}
var latestTriggerID int64 = 0
for triggerID := range triggers {
if latestTriggerID > triggerID {
if triggerID > latestTriggerID {
latestTriggerID = triggerID
}
}
Expand Down Expand Up @@ -278,7 +267,7 @@
label *CompactionGroupLabel
segments []*SegmentView
clusteringKeyField *schemapb.FieldSchema
compactionTime *compactTime
collectionTTL time.Duration
triggerID int64
}

Expand Down
184 changes: 184 additions & 0 deletions internal/datacoord/compaction_policy_clustering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord

import (
"context"
"testing"

"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/proto/datapb"
)

func TestClusteringCompactionPolicySuite(t *testing.T) {
suite.Run(t, new(ClusteringCompactionPolicySuite))
}

type ClusteringCompactionPolicySuite struct {
suite.Suite

mockAlloc *NMockAllocator
mockTriggerManager *MockTriggerManager
testLabel *CompactionGroupLabel
handler *NMockHandler
mockPlanContext *MockCompactionPlanContext

clusteringCompactionPolicy *clusteringCompactionPolicy
}

func (s *ClusteringCompactionPolicySuite) SetupTest() {
s.testLabel = &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch-1",
}

segments := genSegmentsForMeta(s.testLabel)
meta := &meta{segments: NewSegmentsInfo()}
for id, segment := range segments {
meta.segments.SetSegment(id, segment)
}
mockAllocator := newMockAllocator()
mockHandler := NewNMockHandler(s.T())
s.handler = mockHandler
s.clusteringCompactionPolicy = newClusteringCompactionPolicy(meta, mockAllocator, mockHandler)
}

func (s *ClusteringCompactionPolicySuite) TestTrigger() {
events, err := s.clusteringCompactionPolicy.Trigger()
s.NoError(err)
gotViews, ok := events[TriggerTypeClustering]
s.True(ok)
s.NotNil(gotViews)
s.Equal(0, len(gotViews))
}

func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionAbnormal() {
// mock error in handler.GetCollection
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, errors.New("mock Error")).Once()
views, triggerID, err := s.clusteringCompactionPolicy.triggerOneCollection(context.TODO(), 1, false)
s.Error(err)
s.Nil(views)
s.Equal(int64(0), triggerID)

// mock "collection not exist" in handler.GetCollection
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(nil, nil).Once()
views2, triggerID2, err2 := s.clusteringCompactionPolicy.triggerOneCollection(context.TODO(), 1, false)
s.NoError(err2)
s.Nil(views2)
s.Equal(int64(0), triggerID2)
}

func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNoClusteringKeySchema() {
coll := &collectionInfo{
ID: 100,
Schema: newTestSchema(),
}
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(coll, nil)

compactionTaskMeta := newTestCompactionTaskMeta(s.T())
s.clusteringCompactionPolicy.meta = &meta{
compactionTaskMeta: compactionTaskMeta,
}
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: 100,
State: datapb.CompactionTaskState_executing,
})

views, triggerID, err := s.clusteringCompactionPolicy.triggerOneCollection(context.TODO(), 100, false)
s.NoError(err)
s.Nil(views)
s.Equal(int64(0), triggerID)
}

func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionCompacting() {
coll := &collectionInfo{
ID: 100,
Schema: newTestScalarClusteringKeySchema(),
}
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(coll, nil)

compactionTaskMeta := newTestCompactionTaskMeta(s.T())
s.clusteringCompactionPolicy.meta = &meta{
compactionTaskMeta: compactionTaskMeta,
}
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: 100,
State: datapb.CompactionTaskState_executing,
})

views3, triggerID3, err3 := s.clusteringCompactionPolicy.triggerOneCollection(context.TODO(), 100, false)
s.NoError(err3)
s.Nil(views3)
s.Equal(int64(1), triggerID3)
}

func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting() {
s.Run("no collection is compacting", func() {
compactionTaskMeta := newTestCompactionTaskMeta(s.T())
s.clusteringCompactionPolicy.meta = &meta{
compactionTaskMeta: compactionTaskMeta,
}
compacting, triggerID := s.clusteringCompactionPolicy.collectionIsClusteringCompacting(collID)
s.False(compacting)
s.Equal(int64(0), triggerID)
})

s.Run("collection is compacting, different state", func() {
tests := []struct {
state datapb.CompactionTaskState
isCompacting bool
triggerID int64
}{
{datapb.CompactionTaskState_pipelining, true, 1},
{datapb.CompactionTaskState_executing, true, 1},
{datapb.CompactionTaskState_completed, false, 1},
{datapb.CompactionTaskState_failed, false, 1},
{datapb.CompactionTaskState_timeout, false, 1},
{datapb.CompactionTaskState_analyzing, true, 1},
{datapb.CompactionTaskState_indexing, true, 1},
{datapb.CompactionTaskState_cleaned, false, 1},
{datapb.CompactionTaskState_meta_saved, true, 1},
}

for _, test := range tests {
s.Run(test.state.String(), func() {
collID := int64(19530)
compactionTaskMeta := newTestCompactionTaskMeta(s.T())
s.clusteringCompactionPolicy.meta = &meta{
compactionTaskMeta: compactionTaskMeta,
}
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: collID,
State: test.state,
})

compacting, triggerID := s.clusteringCompactionPolicy.collectionIsClusteringCompacting(collID)
s.Equal(test.isCompacting, compacting)
s.Equal(test.triggerID, triggerID)
})
}
})
}
Loading
Loading