Skip to content

Commit

Permalink
fix: Maintain load idempotency even when building new indexes (milvus…
Browse files Browse the repository at this point in the history
…-io#35178)

issue: milvus-io#34404

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 authored Aug 5, 2024
1 parent d8668fe commit d905713
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 141 deletions.
10 changes: 0 additions & 10 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,6 @@ func (job *LoadCollectionJob) PreExecute() error {
)
log.Warn(msg)
return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded collection")
} else if !typeutil.MapEqual(collection.GetFieldIndexID(), req.GetFieldIndexID()) {
msg := fmt.Sprintf("collection with different index %v existed, release this collection first before changing its index",
collection.GetFieldIndexID())
log.Warn(msg)
return merr.WrapErrParameterInvalid(collection.GetFieldIndexID(), req.GetFieldIndexID(), "can't change the index for loaded collection")
}

return nil
Expand Down Expand Up @@ -291,11 +286,6 @@ func (job *LoadPartitionJob) PreExecute() error {
msg := "collection with different replica number existed, release this collection first before changing its replica number"
log.Warn(msg)
return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded partitions")
} else if !typeutil.MapEqual(collection.GetFieldIndexID(), req.GetFieldIndexID()) {
msg := fmt.Sprintf("collection with different index %v existed, release this collection first before changing its index",
job.meta.GetFieldIndex(req.GetCollectionID()))
log.Warn(msg)
return merr.WrapErrParameterInvalid(collection.GetFieldIndexID(), req.GetFieldIndexID(), "can't change the index for loaded partitions")
}

return nil
Expand Down
131 changes: 0 additions & 131 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,70 +418,6 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() {
}
}

func (suite *JobSuite) TestLoadCollectionWithDiffIndex() {
ctx := context.Background()

// Test load collection
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
continue
}
// Load with 1 replica
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
FieldIndexID: map[int64]int64{
defaultVecFieldID: defaultIndexID,
},
}
job := NewLoadCollectionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.NoError(err)
suite.EqualValues(1, suite.meta.GetReplicaNumber(collection))
suite.targetMgr.UpdateCollectionCurrentTarget(collection)
suite.assertCollectionLoaded(collection)
}

// Test load with different index
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
continue
}
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
FieldIndexID: map[int64]int64{
defaultVecFieldID: -defaultIndexID,
},
}
job := NewLoadCollectionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
}

func (suite *JobSuite) TestLoadPartition() {
ctx := context.Background()

Expand Down Expand Up @@ -834,73 +770,6 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() {
}
}

func (suite *JobSuite) TestLoadPartitionWithDiffIndex() {
ctx := context.Background()

// Test load partition
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
continue
}
// Load with 1 replica
req := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
FieldIndexID: map[int64]int64{
defaultVecFieldID: defaultIndexID,
},
}
job := NewLoadPartitionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.NoError(err)
suite.EqualValues(1, suite.meta.GetReplicaNumber(collection))
suite.targetMgr.UpdateCollectionCurrentTarget(collection)
suite.assertCollectionLoaded(collection)
}

// Test load partition with different index
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
continue
}
// Load with 1 replica
req := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
FieldIndexID: map[int64]int64{
defaultVecFieldID: -defaultIndexID,
},
}
job := NewLoadPartitionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
}

func (suite *JobSuite) TestReleaseCollection() {
ctx := context.Background()

Expand Down

0 comments on commit d905713

Please sign in to comment.