Skip to content

Commit

Permalink
enhance: cache collection schema attributes to reduce proxy cpu (#29668)
Browse files Browse the repository at this point in the history
See also #29113

The collection schema is crucial when performing search/query but some
of the information is calculated for every request.

This PR change schema field of cached collection info into a utility
`schemaInfo` type to store some stable result, say pk field,
partitionKeyEnabled, etc. And provided field name to id map for
search/query services.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia authored Jan 4, 2024
1 parent a988daf commit 4f8c540
Show file tree
Hide file tree
Showing 18 changed files with 185 additions and 118 deletions.
2 changes: 1 addition & 1 deletion internal/distributed/proxy/httpserver/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (h *HandlersV1) checkDatabase(ctx context.Context, c *gin.Context, dbName s
func (h *HandlersV1) describeCollection(ctx context.Context, c *gin.Context, dbName string, collectionName string) (*schemapb.CollectionSchema, error) {
collSchema, err := proxy.GetCachedCollectionSchema(ctx, dbName, collectionName)
if err == nil {
return collSchema, nil
return collSchema.CollectionSchema, nil
}
req := milvuspb.DescribeCollectionRequest{
DbName: dbName,
Expand Down
56 changes: 50 additions & 6 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Cache interface {
// GetPartitionsIndex returns a partition names in partition key indexed order.
GetPartitionsIndex(ctx context.Context, database, collectionName string) ([]string, error)
// GetCollectionSchema get collection's schema.
GetCollectionSchema(ctx context.Context, database, collectionName string) (*schemapb.CollectionSchema, error)
GetCollectionSchema(ctx context.Context, database, collectionName string) (*schemaInfo, error)
GetShards(ctx context.Context, withCache bool, database, collectionName string, collectionID int64) (map[string][]nodeInfo, error)
DeprecateShardCache(database, collectionName string)
expireShardLeaderCache(ctx context.Context)
Expand Down Expand Up @@ -99,9 +99,8 @@ type collectionBasicInfo struct {
}

type collectionInfo struct {
collID typeutil.UniqueID
schema *schemapb.CollectionSchema
// partInfo map[string]*partitionInfo
collID typeutil.UniqueID
schema *schemaInfo
partInfo *partitionInfos
leaderMutex sync.RWMutex
shardLeaders *shardLeaders
Expand All @@ -110,6 +109,51 @@ type collectionInfo struct {
consistencyLevel commonpb.ConsistencyLevel
}

// schemaInfo is a helper function wraps *schemapb.CollectionSchema
// with extra fields mapping and methods
type schemaInfo struct {
*schemapb.CollectionSchema
fieldMap *typeutil.ConcurrentMap[string, int64] // field name to id mapping
hasPartitionKeyField bool
pkField *schemapb.FieldSchema
}

func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
fieldMap := typeutil.NewConcurrentMap[string, int64]()
hasPartitionkey := false
var pkField *schemapb.FieldSchema
for _, field := range schema.GetFields() {
fieldMap.Insert(field.GetName(), field.GetFieldID())
if field.GetIsPartitionKey() {
hasPartitionkey = true
}
if field.GetIsPrimaryKey() {
pkField = field
}
}
return &schemaInfo{
CollectionSchema: schema,
fieldMap: fieldMap,
hasPartitionKeyField: hasPartitionkey,
pkField: pkField,
}
}

func (s *schemaInfo) MapFieldID(name string) (int64, bool) {
return s.fieldMap.Get(name)
}

func (s *schemaInfo) IsPartitionKeyCollection() bool {
return s.hasPartitionKeyField
}

func (s *schemaInfo) GetPkField() (*schemapb.FieldSchema, error) {
if s.pkField == nil {
return nil, merr.WrapErrServiceInternal("pk field not found")
}
return s.pkField, nil
}

// partitionInfos contains the cached collection partition informations.
type partitionInfos struct {
partitionInfos []*partitionInfo
Expand Down Expand Up @@ -396,7 +440,7 @@ func (m *MetaCache) getFullCollectionInfo(ctx context.Context, database, collect
return collInfo, nil
}

func (m *MetaCache) GetCollectionSchema(ctx context.Context, database, collectionName string) (*schemapb.CollectionSchema, error) {
func (m *MetaCache) GetCollectionSchema(ctx context.Context, database, collectionName string) (*schemaInfo, error) {
m.mu.RLock()
var collInfo *collectionInfo
var ok bool
Expand Down Expand Up @@ -445,7 +489,7 @@ func (m *MetaCache) updateCollection(coll *milvuspb.DescribeCollectionResponse,
if !ok {
m.collInfo[database][collectionName] = &collectionInfo{}
}
m.collInfo[database][collectionName].schema = coll.Schema
m.collInfo[database][collectionName].schema = newSchemaInfo(coll.Schema)
m.collInfo[database][collectionName].collID = coll.CollectionID
m.collInfo[database][collectionName].createdTimestamp = coll.CreatedTimestamp
m.collInfo[database][collectionName].createdUtcTimestamp = coll.CreatedUtcTimestamp
Expand Down
18 changes: 9 additions & 9 deletions internal/proxy/meta_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 1)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -220,7 +220,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection2")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection2",
Expand All @@ -234,7 +234,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestMetaCache_GetCollectionName(t *testing.T) {
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 1)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -302,7 +302,7 @@ func TestMetaCache_GetCollectionName(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection2")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection2",
Expand All @@ -316,7 +316,7 @@ func TestMetaCache_GetCollectionName(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -340,7 +340,7 @@ func TestMetaCache_GetCollectionFailure(t *testing.T) {

schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -349,7 +349,7 @@ func TestMetaCache_GetCollectionFailure(t *testing.T) {
rootCoord.Error = true
// should be cached with no error
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand Down Expand Up @@ -410,7 +410,7 @@ func TestMetaCache_ConcurrentTest1(t *testing.T) {
// GetCollectionSchema will never fail
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand Down
16 changes: 7 additions & 9 deletions internal/proxy/mock_cache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ func (t *loadCollectionTask) Execute(ctx context.Context) (err error) {
),
DbID: 0,
CollectionID: collID,
Schema: collSchema,
Schema: collSchema.CollectionSchema,
ReplicaNumber: t.ReplicaNumber,
FieldIndexID: fieldIndexIDs,
Refresh: t.Refresh,
Expand Down Expand Up @@ -1738,7 +1738,7 @@ func (t *loadPartitionsTask) Execute(ctx context.Context) error {
DbID: 0,
CollectionID: collID,
PartitionIDs: partitionIDs,
Schema: collSchema,
Schema: collSchema.CollectionSchema,
ReplicaNumber: t.ReplicaNumber,
FieldIndexID: fieldIndexIDs,
Refresh: t.Refresh,
Expand Down
12 changes: 6 additions & 6 deletions internal/proxy/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ type deleteRunner struct {
tsoAllocatorIns tsoAllocator

// delete info
schema *schemapb.CollectionSchema
schema *schemaInfo
collectionID UniqueID
partitionID UniqueID
partitionKeyMode bool
Expand Down Expand Up @@ -264,8 +264,8 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
return ErrWithLog(log, "Failed to get collection schema", err)
}

dr.partitionKeyMode = hasParitionKeyModeField(dr.schema)
// get prititionIDs of delete
dr.partitionKeyMode = dr.schema.IsPartitionKeyCollection()
// get partitionIDs of delete
dr.partitionID = common.InvalidPartitionID
if len(dr.req.PartitionName) > 0 {
if dr.partitionKeyMode {
Expand Down Expand Up @@ -300,12 +300,12 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
}

func (dr *deleteRunner) Run(ctx context.Context) error {
plan, err := planparserv2.CreateRetrievePlan(dr.schema, dr.req.Expr)
plan, err := planparserv2.CreateRetrievePlan(dr.schema.CollectionSchema, dr.req.Expr)
if err != nil {
return fmt.Errorf("failed to create expr plan, expr = %s", dr.req.GetExpr())
}

isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema, plan)
isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, plan)
if isSimple {
// if could get delete.primaryKeys from delete expr
err := dr.simpleDelete(ctx, pk, numRow)
Expand Down Expand Up @@ -379,7 +379,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
zap.Int64("nodeID", nodeID))

// set plan
_, outputFieldIDs := translatePkOutputFields(dr.schema)
_, outputFieldIDs := translatePkOutputFields(dr.schema.CollectionSchema)
outputFieldIDs = append(outputFieldIDs, common.TimeStampField)
plan.OutputFieldIds = outputFieldIDs

Expand Down
21 changes: 13 additions & 8 deletions internal/proxy/task_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestDeleteRunner_Init(t *testing.T) {
// channels := []string{"test_channel"}
dbName := "test_1"

schema := &schemapb.CollectionSchema{
collSchema := &schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
Expand All @@ -253,6 +253,7 @@ func TestDeleteRunner_Init(t *testing.T) {
},
},
}
schema := newSchemaInfo(collSchema)

t.Run("empty collection name", func(t *testing.T) {
dr := deleteRunner{}
Expand Down Expand Up @@ -312,7 +313,7 @@ func TestDeleteRunner_Init(t *testing.T) {
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(&schemapb.CollectionSchema{
).Return(newSchemaInfo(&schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
Expand All @@ -325,7 +326,7 @@ func TestDeleteRunner_Init(t *testing.T) {
IsPartitionKey: true,
},
},
}, nil)
}), nil)

globalMetaCache = cache
assert.Error(t, dr.Init(context.Background()))
Expand Down Expand Up @@ -440,7 +441,7 @@ func TestDeleteRunner_Run(t *testing.T) {
queue.Start()
defer queue.Close()

schema := &schemapb.CollectionSchema{
collSchema := &schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
Expand All @@ -459,6 +460,7 @@ func TestDeleteRunner_Run(t *testing.T) {
},
},
}
schema := newSchemaInfo(collSchema)

metaCache := NewMockCache(t)
metaCache.EXPECT().GetCollectionID(mock.Anything, dbName, collectionName).Return(collectionID, nil).Maybe()
Expand All @@ -474,6 +476,7 @@ func TestDeleteRunner_Run(t *testing.T) {
req: &milvuspb.DeleteRequest{
Expr: "????",
},
schema: schema,
}
assert.Error(t, dr.Run(context.Background()))
})
Expand Down Expand Up @@ -838,7 +841,7 @@ func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
queue.Start()
defer queue.Close()

schema := &schemapb.CollectionSchema{
collSchema := &schemapb.CollectionSchema{
Name: "test_delete",
Description: "",
AutoID: false,
Expand All @@ -859,7 +862,9 @@ func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
}

// test partitionKey mode
schema.Fields[1].IsPartitionKey = true
collSchema.Fields[1].IsPartitionKey = true

schema := newSchemaInfo(collSchema)
partitionMaps := make(map[string]int64)
partitionMaps["test_0"] = 1
partitionMaps["test_1"] = 2
Expand Down Expand Up @@ -930,7 +935,7 @@ func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
globalMetaCache = mockCache
defer func() { globalMetaCache = nil }()

plan, err := planparserv2.CreateRetrievePlan(dr.schema, dr.req.Expr)
plan, err := planparserv2.CreateRetrievePlan(dr.schema.CollectionSchema, dr.req.Expr)
assert.NoError(t, err)
queryFunc := dr.getStreamingQueryAndDelteFunc(plan)
assert.Error(t, queryFunc(ctx, 1, qn))
Expand Down Expand Up @@ -973,7 +978,7 @@ func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
globalMetaCache = mockCache
defer func() { globalMetaCache = nil }()

plan, err := planparserv2.CreateRetrievePlan(dr.schema, dr.req.Expr)
plan, err := planparserv2.CreateRetrievePlan(dr.schema.CollectionSchema, dr.req.Expr)
assert.NoError(t, err)
queryFunc := dr.getStreamingQueryAndDelteFunc(plan)
assert.Error(t, queryFunc(ctx, 1, qn))
Expand Down
Loading

0 comments on commit 4f8c540

Please sign in to comment.