Skip to content

Commit

Permalink
Compitable logic of default database (milvus-io#9)
Browse files Browse the repository at this point in the history
Signed-off-by: longjiquan <jiquan.long@zilliz.com>
  • Loading branch information
longjiquan authored Apr 28, 2023
1 parent b36f745 commit 67b0304
Show file tree
Hide file tree
Showing 19 changed files with 270 additions and 213 deletions.
54 changes: 45 additions & 9 deletions internal/metastore/kv/rootcoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,24 +175,39 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
}

// Though batchSave is not atomic enough, we can promise the atomicity outside.
// Recovering from failure, if we found collection is creating, we should removing all these related meta.
// Recovering from failure, if we found collection is creating, we should remove all these related meta.
return etcd.SaveByBatchWithLimit(kvs, maxTxnNum/2, func(partialKvs map[string]string) error {
return kc.Snapshot.MultiSave(partialKvs, ts)
})
}

func (kc *Catalog) loadCollection(ctx context.Context, dbName string, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
func (kc *Catalog) loadCollectionFromDb(ctx context.Context, dbName string, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
collKey := BuildCollectionKey(dbName, collectionID)
collVal, err := kc.Snapshot.Load(collKey, ts)
if err != nil {
return nil, common.NewCollectionNotExistError(fmt.Sprintf("collection not found: %d", collectionID))
return nil, common.NewCollectionNotExistError(fmt.Sprintf("collection not found: %d, error: %s", collectionID, err.Error()))
}

collMeta := &pb.CollectionInfo{}
err = proto.Unmarshal([]byte(collVal), collMeta)
return collMeta, err
}

func (kc *Catalog) loadCollectionFromDefaultDb(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
if info, err := kc.loadCollectionFromDb(ctx, "default", collectionID, ts); err == nil {
return info, nil
}
// get collection from older version.
return kc.loadCollectionFromDb(ctx, "", collectionID, ts)
}

func (kc *Catalog) loadCollection(ctx context.Context, dbName string, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
if dbName == "default" {
return kc.loadCollectionFromDefaultDb(ctx, collectionID, ts)
}
return kc.loadCollectionFromDb(ctx, dbName, collectionID, ts)
}

func partitionVersionAfter210(collMeta *pb.CollectionInfo) bool {
return len(collMeta.GetPartitionIDs()) <= 0 &&
len(collMeta.GetPartitionNames()) <= 0 &&
Expand Down Expand Up @@ -237,7 +252,8 @@ func (kc *Catalog) CreatePartition(ctx context.Context, dbName string, partition
collMeta.PartitionNames = append(collMeta.PartitionNames, partition.PartitionName)
collMeta.PartitionCreatedTimestamps = append(collMeta.PartitionCreatedTimestamps, partition.PartitionCreatedTimestamp)

k := BuildCollectionKey(dbName, partition.CollectionID)
// this partition exists in older version, should be also changed in place.
k := BuildCollectionKey("", partition.CollectionID)
v, err := proto.Marshal(collMeta)
if err != nil {
return err
Expand Down Expand Up @@ -379,13 +395,21 @@ func (kc *Catalog) AlterAlias(ctx context.Context, alias *model.Alias, ts typeut
}

func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error {
collectionKey := BuildCollectionKey(collectionInfo.DBName, collectionInfo.CollectionID)
collectionKeys := []string{BuildCollectionKey(collectionInfo.DBName, collectionInfo.CollectionID)}
if collectionInfo.DBName == "default" {
collectionKeys = append(collectionKeys, BuildCollectionKey("", collectionInfo.CollectionID))
}

var delMetakeysSnap []string
for _, alias := range collectionInfo.Aliases {
delMetakeysSnap = append(delMetakeysSnap,
BuildAliasKey210(alias),
BuildAliasKey(alias),
BuildAliasKeyWithDb(collectionInfo.DBName, alias),
)
if collectionInfo.DBName == "default" {
delMetakeysSnap = append(delMetakeysSnap, BuildAliasKeyWithDb("", alias))
}
}
// Snapshot will list all (k, v) pairs and then use Txn.MultiSave to save tombstone for these keys when it prepares
// to remove a prefix, so though we have very few prefixes, the final operations may exceed the max txn number.
Expand All @@ -407,7 +431,7 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col
}

// if we found collection dropping, we should try removing related resources.
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{collectionKey}, ts)
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, collectionKeys, ts)
}

func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *model.Collection, ts typeutil.Timestamp) error {
Expand All @@ -431,6 +455,10 @@ func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *mod
if err != nil {
return err
}
if newColl.DBName == "default" {
removal := BuildCollectionKey("", newColl.CollectionID)
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(map[string]string{key: string(value)}, []string{removal}, ts)
}
return kc.Snapshot.Save(key, string(value), ts)
}

Expand Down Expand Up @@ -634,21 +662,29 @@ func (kc *Catalog) listAliasesAfter210WithDb(ctx context.Context, dbName string,
return aliases, nil
}

func (kc *Catalog) listAliasesWithDb(ctx context.Context, dbName string, ts typeutil.Timestamp) ([]*model.Alias, error) {
func (kc *Catalog) listAliasesInDefaultDb(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) {
aliases1, err := kc.listAliasesBefore210(ctx, ts)
if err != nil {
return nil, err
}
aliases2, err := kc.listAliasesAfter210WithDb(ctx, dbName, ts)
aliases2, err := kc.listAliasesAfter210WithDb(ctx, "default", ts)
if err != nil {
return nil, err
}
aliases3, err := kc.listAliasesAfter210WithDb(ctx, "", ts)
if err != nil {
return nil, err
}
aliases := append(aliases1, aliases2...)
aliases = append(aliases, aliases3...)
return aliases, nil
}

func (kc *Catalog) ListAliases(ctx context.Context, dbName string, ts typeutil.Timestamp) ([]*model.Alias, error) {
return kc.listAliasesWithDb(ctx, dbName, ts)
if dbName != "default" {
return kc.listAliasesAfter210WithDb(ctx, dbName, ts)
}
return kc.listAliasesInDefaultDb(ctx, ts)
}

func (kc *Catalog) ListCredentials(ctx context.Context) ([]string, error) {
Expand Down
106 changes: 49 additions & 57 deletions internal/proxy/database_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,170 +19,162 @@ func DatabaseInterceptor() grpc.UnaryServerInterceptor {
func fillDatabase(ctx context.Context, req interface{}) (context.Context, interface{}) {
switch r := req.(type) {
case *milvuspb.CreateCollectionRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.DropCollectionRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.HasCollectionRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.LoadCollectionRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.ReleaseCollectionRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.DescribeCollectionRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetStatisticsRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetCollectionStatisticsRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.ShowCollectionsRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.AlterCollectionRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.CreatePartitionRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.DropPartitionRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.HasPartitionRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.LoadPartitionsRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.ReleasePartitionsRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetPartitionStatisticsRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.ShowPartitionsRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetLoadingProgressRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetLoadStateRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.CreateIndexRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.DescribeIndexRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.DropIndexRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetIndexBuildProgressRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetIndexStateRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.InsertRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.DeleteRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.SearchRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.FlushRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.QueryRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.CreateAliasRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.DropAliasRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.AlterAliasRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.CalcDistanceRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
// r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.FlushAllRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
// r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetPersistentSegmentInfoRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetQuerySegmentInfoRequest:
r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.DummyRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
// r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetMetricsRequest:
return ctx, r
case *milvuspb.LoadBalanceRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetReplicasRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetCompactionStateRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
// r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.ManualCompactionRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
// r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetCompactionPlansRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
// r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetFlushStateRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
// r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetFlushAllStateRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
// r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.ImportRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.GetImportStateRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
// r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.ListImportTasksRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.RenameCollectionRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
case *milvuspb.TransferReplicaRequest:
// TODO
// r.DbName = GetCurDatabaseFromContextOrEmpty(ctx)
r.DbName = GetCurDatabaseFromContextOrDefault(ctx)
return ctx, r
default:
return ctx, req
Expand Down
Loading

0 comments on commit 67b0304

Please sign in to comment.