Skip to content

Commit

Permalink
feat: Support create collection with functions (#35973)
Browse files Browse the repository at this point in the history
relate: #35853
Support create collection with functions. Prepare for support bm25
function.

---------

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
  • Loading branch information
aoiasd authored Sep 12, 2024
1 parent 08e6811 commit da227ff
Show file tree
Hide file tree
Showing 25 changed files with 765 additions and 140 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240822040249-4bbc8f623cbb
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd
github.com/minio/minio-go/v7 v7.0.61
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240822040249-4bbc8f623cbb h1:S3QIkNv9N1Vd1UKtdaQ4yVDPFAwFiPSAjN07axzbR70=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240822040249-4bbc8f623cbb/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd h1:x0b0+foTe23sKcVFseR1DE8+BB08EH6ViiRHaz8PEik=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
50 changes: 47 additions & 3 deletions internal/metastore/kv/rootcoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ func BuildFieldKey(collectionID typeutil.UniqueID, fieldID int64) string {
return fmt.Sprintf("%s/%d", BuildFieldPrefix(collectionID), fieldID)
}

func BuildFunctionPrefix(collectionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", FunctionMetaPrefix, collectionID)
}

func BuildFunctionKey(collectionID typeutil.UniqueID, functionID int64) string {
return fmt.Sprintf("%s/%d", BuildFunctionPrefix(collectionID), functionID)
}

func BuildAliasKey210(alias string) string {
return fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix210, alias)
}
Expand Down Expand Up @@ -166,7 +174,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,

kvs := map[string]string{}

// save partition info to newly path.
// save partition info to new path.
for _, partition := range coll.Partitions {
k := BuildPartitionKey(coll.CollectionID, partition.PartitionID)
partitionInfo := model.MarshalPartitionModel(partition)
Expand All @@ -178,8 +186,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
}

// no default aliases will be created.

// save fields info to newly path.
// save fields info to new path.
for _, field := range coll.Fields {
k := BuildFieldKey(coll.CollectionID, field.FieldID)
fieldInfo := model.MarshalFieldModel(field)
Expand All @@ -190,6 +197,17 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
kvs[k] = string(v)
}

// save functions info to new path.
for _, function := range coll.Functions {
k := BuildFunctionKey(coll.CollectionID, function.ID)
functionInfo := model.MarshalFunctionModel(function)
v, err := proto.Marshal(functionInfo)
if err != nil {
return err
}
kvs[k] = string(v)
}

// Though batchSave is not atomic enough, we can promise the atomicity outside.
// Recovering from failure, if we found collection is creating, we should remove all these related meta.
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
Expand Down Expand Up @@ -358,6 +376,24 @@ func (kc *Catalog) listFieldsAfter210(ctx context.Context, collectionID typeutil
return fields, nil
}

func (kc *Catalog) listFunctions(collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Function, error) {
prefix := BuildFunctionPrefix(collectionID)
_, values, err := kc.Snapshot.LoadWithPrefix(prefix, ts)
if err != nil {
return nil, err
}
functions := make([]*model.Function, 0, len(values))
for _, v := range values {
functionSchema := &schemapb.FunctionSchema{}
err := proto.Unmarshal([]byte(v), functionSchema)
if err != nil {
return nil, err
}
functions = append(functions, model.UnmarshalFunctionModel(functionSchema))
}
return functions, nil
}

func (kc *Catalog) appendPartitionAndFieldsInfo(ctx context.Context, collMeta *pb.CollectionInfo,
ts typeutil.Timestamp,
) (*model.Collection, error) {
Expand All @@ -379,6 +415,11 @@ func (kc *Catalog) appendPartitionAndFieldsInfo(ctx context.Context, collMeta *p
}
collection.Fields = fields

functions, err := kc.listFunctions(collection.CollectionID, ts)
if err != nil {
return nil, err
}
collection.Functions = functions
return collection, nil
}

Expand Down Expand Up @@ -441,6 +482,9 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col
for _, field := range collectionInfo.Fields {
delMetakeysSnap = append(delMetakeysSnap, BuildFieldKey(collectionInfo.CollectionID, field.FieldID))
}
for _, function := range collectionInfo.Functions {
delMetakeysSnap = append(delMetakeysSnap, BuildFunctionKey(collectionInfo.CollectionID, function.ID))
}
// delMetakeysSnap = append(delMetakeysSnap, buildPartitionPrefix(collectionInfo.CollectionID))
// delMetakeysSnap = append(delMetakeysSnap, buildFieldPrefix(collectionInfo.CollectionID))

Expand Down
65 changes: 64 additions & 1 deletion internal/metastore/kv/rootcoord/kv_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,17 @@ func TestCatalog_ListCollections(t *testing.T) {
return strings.HasPrefix(prefix, FieldMetaPrefix)
}), ts).
Return([]string{"key"}, []string{string(fm)}, nil)
kc := Catalog{Snapshot: kv}

functionMeta := &schemapb.FunctionSchema{}
fcm, err := proto.Marshal(functionMeta)
assert.NoError(t, err)
kv.On("LoadWithPrefix", mock.MatchedBy(
func(prefix string) bool {
return strings.HasPrefix(prefix, FunctionMetaPrefix)
}), ts).
Return([]string{"key"}, []string{string(fcm)}, nil)

kc := Catalog{Snapshot: kv}
ret, err := kc.ListCollections(ctx, testDb, ts)
assert.NoError(t, err)
assert.NotNil(t, ret)
Expand Down Expand Up @@ -248,6 +257,16 @@ func TestCatalog_ListCollections(t *testing.T) {
return strings.HasPrefix(prefix, FieldMetaPrefix)
}), ts).
Return([]string{"key"}, []string{string(fm)}, nil)

functionMeta := &schemapb.FunctionSchema{}
fcm, err := proto.Marshal(functionMeta)
assert.NoError(t, err)
kv.On("LoadWithPrefix", mock.MatchedBy(
func(prefix string) bool {
return strings.HasPrefix(prefix, FunctionMetaPrefix)
}), ts).
Return([]string{"key"}, []string{string(fcm)}, nil)

kv.On("MultiSaveAndRemove", mock.Anything, mock.Anything, ts).Return(nil)
kc := Catalog{Snapshot: kv}

Expand Down Expand Up @@ -1215,6 +1234,22 @@ func TestCatalog_CreateCollection(t *testing.T) {
err := kc.CreateCollection(ctx, coll, 100)
assert.NoError(t, err)
})

t.Run("create collection with function", func(t *testing.T) {
mockSnapshot := newMockSnapshot(t, withMockSave(nil), withMockMultiSave(nil))
kc := &Catalog{Snapshot: mockSnapshot}
ctx := context.Background()
coll := &model.Collection{
Partitions: []*model.Partition{
{PartitionName: "test"},
},
Fields: []*model.Field{{Name: "text", DataType: schemapb.DataType_VarChar}, {Name: "sparse", DataType: schemapb.DataType_SparseFloatVector}},
Functions: []*model.Function{{Name: "test", Type: schemapb.FunctionType_BM25, InputFieldNames: []string{"text"}, OutputFieldNames: []string{"sparse"}}},
State: pb.CollectionState_CollectionCreating,
}
err := kc.CreateCollection(ctx, coll, 100)
assert.NoError(t, err)
})
}

func TestCatalog_DropCollection(t *testing.T) {
Expand Down Expand Up @@ -1281,6 +1316,22 @@ func TestCatalog_DropCollection(t *testing.T) {
err := kc.DropCollection(ctx, coll, 100)
assert.NoError(t, err)
})

t.Run("drop collection with function", func(t *testing.T) {
mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemove(nil))
kc := &Catalog{Snapshot: mockSnapshot}
ctx := context.Background()
coll := &model.Collection{
Partitions: []*model.Partition{
{PartitionName: "test"},
},
Fields: []*model.Field{{Name: "text", DataType: schemapb.DataType_VarChar}, {Name: "sparse", DataType: schemapb.DataType_SparseFloatVector}},
Functions: []*model.Function{{Name: "test", Type: schemapb.FunctionType_BM25, InputFieldNames: []string{"text"}, OutputFieldNames: []string{"sparse"}}},
State: pb.CollectionState_CollectionDropping,
}
err := kc.DropCollection(ctx, coll, 100)
assert.NoError(t, err)
})
}

func getUserInfoMetaString(username string) string {
Expand Down Expand Up @@ -2779,3 +2830,15 @@ func TestCatalog_AlterDatabase(t *testing.T) {
err = c.AlterDatabase(ctx, newDB, typeutil.ZeroTimestamp)
assert.ErrorIs(t, err, mockErr)
}

func TestCatalog_listFunctionError(t *testing.T) {
mockSnapshot := newMockSnapshot(t)
kc := &Catalog{Snapshot: mockSnapshot}
mockSnapshot.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("mock error"))
_, err := kc.listFunctions(1, 1)
assert.Error(t, err)

mockSnapshot.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return([]string{"test-key"}, []string{"invalid bytes"}, nil)
_, err = kc.listFunctions(1, 1)
assert.Error(t, err)
}
1 change: 1 addition & 0 deletions internal/metastore/kv/rootcoord/rootcoord_constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
PartitionMetaPrefix = ComponentPrefix + "/partitions"
AliasMetaPrefix = ComponentPrefix + "/aliases"
FieldMetaPrefix = ComponentPrefix + "/fields"
FunctionMetaPrefix = ComponentPrefix + "/functions"

// CollectionAliasMetaPrefix210 prefix for collection alias meta
CollectionAliasMetaPrefix210 = ComponentPrefix + "/collection-alias"
Expand Down
2 changes: 2 additions & 0 deletions internal/metastore/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Collection struct {
Description string
AutoID bool
Fields []*Field
Functions []*Function
VirtualChannelNames []string
PhysicalChannelNames []string
ShardsNum int32
Expand Down Expand Up @@ -54,6 +55,7 @@ func (c *Collection) Clone() *Collection {
Properties: common.CloneKeyValuePairs(c.Properties),
State: c.State,
EnableDynamicField: c.EnableDynamicField,
Functions: CloneFunctions(c.Functions),
}
}

Expand Down
18 changes: 10 additions & 8 deletions internal/metastore/model/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ import (
)

var (
colID int64 = 1
colName = "c"
fieldID int64 = 101
fieldName = "field110"
partID int64 = 20
partName = "testPart"
tenantID = "tenant-1"
typeParams = []*commonpb.KeyValuePair{
colID int64 = 1
colName = "c"
fieldID int64 = 101
fieldName = "field110"
partID int64 = 20
partName = "testPart"
tenantID = "tenant-1"
functionID int64 = 1
functionName = "test-bm25"
typeParams = []*commonpb.KeyValuePair{
{
Key: "field110-k1",
Value: "field110-v1",
Expand Down
Loading

0 comments on commit da227ff

Please sign in to comment.