Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: cache collection schema attributes to reduce proxy cpu #29668

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
enhance: cache collection schema attributes to reduce proxy cpu
See also #29113

The collection schema is crucial when performing search/query but some
of the information is calculated for every request.

This PR change schema field of cached collection info into a utility
`schemaInfo` type to store some stable result, say pk field,
partitionKeyEnabled, etc. And provided field name to id map for
search/query services.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Jan 4, 2024
commit 19037a9494c155847a67e8b570ebf0d7853fb1ff
56 changes: 50 additions & 6 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Cache interface {
// GetPartitionsIndex returns a partition names in partition key indexed order.
GetPartitionsIndex(ctx context.Context, database, collectionName string) ([]string, error)
// GetCollectionSchema get collection's schema.
GetCollectionSchema(ctx context.Context, database, collectionName string) (*schemapb.CollectionSchema, error)
GetCollectionSchema(ctx context.Context, database, collectionName string) (*schemaInfo, error)
GetShards(ctx context.Context, withCache bool, database, collectionName string, collectionID int64) (map[string][]nodeInfo, error)
DeprecateShardCache(database, collectionName string)
expireShardLeaderCache(ctx context.Context)
Expand Down Expand Up @@ -99,9 +99,8 @@ type collectionBasicInfo struct {
}

type collectionInfo struct {
collID typeutil.UniqueID
schema *schemapb.CollectionSchema
// partInfo map[string]*partitionInfo
collID typeutil.UniqueID
schema *schemaInfo
partInfo *partitionInfos
leaderMutex sync.RWMutex
shardLeaders *shardLeaders
Expand All @@ -110,6 +109,51 @@ type collectionInfo struct {
consistencyLevel commonpb.ConsistencyLevel
}

// schemaInfo is a helper function wraps *schemapb.CollectionSchema
// with extra fields mapping and methods
type schemaInfo struct {
*schemapb.CollectionSchema
fieldMap *typeutil.ConcurrentMap[string, int64] // field name to id mapping
hasPartitionKeyField bool
pkField *schemapb.FieldSchema
}

func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
fieldMap := typeutil.NewConcurrentMap[string, int64]()
hasPartitionkey := false
var pkField *schemapb.FieldSchema
for _, field := range schema.GetFields() {
fieldMap.Insert(field.GetName(), field.GetFieldID())
if field.GetIsPartitionKey() {
hasPartitionkey = true
}
if field.GetIsPrimaryKey() {
pkField = field
}
}
return &schemaInfo{
CollectionSchema: schema,
fieldMap: fieldMap,
hasPartitionKeyField: hasPartitionkey,
pkField: pkField,
}
}

func (s *schemaInfo) MapFieldID(name string) (int64, bool) {
return s.fieldMap.Get(name)
}

func (s *schemaInfo) IsPartitionKeyCollection() bool {
return s.hasPartitionKeyField
}

func (s *schemaInfo) GetPkField() (*schemapb.FieldSchema, error) {
if s.pkField == nil {
return nil, merr.WrapErrServiceInternal("pk field not found")
}
return s.pkField, nil
}

// partitionInfos contains the cached collection partition informations.
type partitionInfos struct {
partitionInfos []*partitionInfo
Expand Down Expand Up @@ -396,7 +440,7 @@ func (m *MetaCache) getFullCollectionInfo(ctx context.Context, database, collect
return collInfo, nil
}

func (m *MetaCache) GetCollectionSchema(ctx context.Context, database, collectionName string) (*schemapb.CollectionSchema, error) {
func (m *MetaCache) GetCollectionSchema(ctx context.Context, database, collectionName string) (*schemaInfo, error) {
m.mu.RLock()
var collInfo *collectionInfo
var ok bool
Expand Down Expand Up @@ -445,7 +489,7 @@ func (m *MetaCache) updateCollection(coll *milvuspb.DescribeCollectionResponse,
if !ok {
m.collInfo[database][collectionName] = &collectionInfo{}
}
m.collInfo[database][collectionName].schema = coll.Schema
m.collInfo[database][collectionName].schema = newSchemaInfo(coll.Schema)
m.collInfo[database][collectionName].collID = coll.CollectionID
m.collInfo[database][collectionName].createdTimestamp = coll.CreatedTimestamp
m.collInfo[database][collectionName].createdUtcTimestamp = coll.CreatedUtcTimestamp
Expand Down
18 changes: 9 additions & 9 deletions internal/proxy/meta_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 1)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -220,7 +220,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection2")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection2",
Expand All @@ -234,7 +234,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestMetaCache_GetCollectionName(t *testing.T) {
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 1)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -302,7 +302,7 @@ func TestMetaCache_GetCollectionName(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection2")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection2",
Expand All @@ -316,7 +316,7 @@ func TestMetaCache_GetCollectionName(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -340,7 +340,7 @@ func TestMetaCache_GetCollectionFailure(t *testing.T) {

schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -349,7 +349,7 @@ func TestMetaCache_GetCollectionFailure(t *testing.T) {
rootCoord.Error = true
// should be cached with no error
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand Down Expand Up @@ -410,7 +410,7 @@ func TestMetaCache_ConcurrentTest1(t *testing.T) {
// GetCollectionSchema will never fail
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.NoError(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand Down
16 changes: 7 additions & 9 deletions internal/proxy/mock_cache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ func (t *loadCollectionTask) Execute(ctx context.Context) (err error) {
),
DbID: 0,
CollectionID: collID,
Schema: collSchema,
Schema: collSchema.CollectionSchema,
ReplicaNumber: t.ReplicaNumber,
FieldIndexID: fieldIndexIDs,
Refresh: t.Refresh,
Expand Down Expand Up @@ -1738,7 +1738,7 @@ func (t *loadPartitionsTask) Execute(ctx context.Context) error {
DbID: 0,
CollectionID: collID,
PartitionIDs: partitionIDs,
Schema: collSchema,
Schema: collSchema.CollectionSchema,
ReplicaNumber: t.ReplicaNumber,
FieldIndexID: fieldIndexIDs,
Refresh: t.Refresh,
Expand Down
12 changes: 6 additions & 6 deletions internal/proxy/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ type deleteRunner struct {
tsoAllocatorIns tsoAllocator

// delete info
schema *schemapb.CollectionSchema
schema *schemaInfo
collectionID UniqueID
partitionID UniqueID
partitionKeyMode bool
Expand Down Expand Up @@ -264,8 +264,8 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
return ErrWithLog(log, "Failed to get collection schema", err)
}

dr.partitionKeyMode = hasParitionKeyModeField(dr.schema)
// get prititionIDs of delete
dr.partitionKeyMode = dr.schema.IsPartitionKeyCollection()
// get partitionIDs of delete
dr.partitionID = common.InvalidPartitionID
if len(dr.req.PartitionName) > 0 {
if dr.partitionKeyMode {
Expand Down Expand Up @@ -300,12 +300,12 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
}

func (dr *deleteRunner) Run(ctx context.Context) error {
plan, err := planparserv2.CreateRetrievePlan(dr.schema, dr.req.Expr)
plan, err := planparserv2.CreateRetrievePlan(dr.schema.CollectionSchema, dr.req.Expr)
if err != nil {
return fmt.Errorf("failed to create expr plan, expr = %s", dr.req.GetExpr())
}

isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema, plan)
isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, plan)
if isSimple {
// if could get delete.primaryKeys from delete expr
err := dr.simpleDelete(ctx, pk, numRow)
Expand Down Expand Up @@ -379,7 +379,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
zap.Int64("nodeID", nodeID))

// set plan
_, outputFieldIDs := translatePkOutputFields(dr.schema)
_, outputFieldIDs := translatePkOutputFields(dr.schema.CollectionSchema)
outputFieldIDs = append(outputFieldIDs, common.TimeStampField)
plan.OutputFieldIds = outputFieldIDs

Expand Down
21 changes: 13 additions & 8 deletions internal/proxy/task_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestDeleteRunner_Init(t *testing.T) {
// channels := []string{"test_channel"}
dbName := "test_1"

schema := &schemapb.CollectionSchema{
collSchema := &schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
Expand All @@ -253,6 +253,7 @@ func TestDeleteRunner_Init(t *testing.T) {
},
},
}
schema := newSchemaInfo(collSchema)

t.Run("empty collection name", func(t *testing.T) {
dr := deleteRunner{}
Expand Down Expand Up @@ -312,7 +313,7 @@ func TestDeleteRunner_Init(t *testing.T) {
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(&schemapb.CollectionSchema{
).Return(newSchemaInfo(&schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
Expand All @@ -325,7 +326,7 @@ func TestDeleteRunner_Init(t *testing.T) {
IsPartitionKey: true,
},
},
}, nil)
}), nil)

globalMetaCache = cache
assert.Error(t, dr.Init(context.Background()))
Expand Down Expand Up @@ -440,7 +441,7 @@ func TestDeleteRunner_Run(t *testing.T) {
queue.Start()
defer queue.Close()

schema := &schemapb.CollectionSchema{
collSchema := &schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
Expand All @@ -459,6 +460,7 @@ func TestDeleteRunner_Run(t *testing.T) {
},
},
}
schema := newSchemaInfo(collSchema)

metaCache := NewMockCache(t)
metaCache.EXPECT().GetCollectionID(mock.Anything, dbName, collectionName).Return(collectionID, nil).Maybe()
Expand All @@ -474,6 +476,7 @@ func TestDeleteRunner_Run(t *testing.T) {
req: &milvuspb.DeleteRequest{
Expr: "????",
},
schema: schema,
}
assert.Error(t, dr.Run(context.Background()))
})
Expand Down Expand Up @@ -838,7 +841,7 @@ func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
queue.Start()
defer queue.Close()

schema := &schemapb.CollectionSchema{
collSchema := &schemapb.CollectionSchema{
Name: "test_delete",
Description: "",
AutoID: false,
Expand All @@ -859,7 +862,9 @@ func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
}

// test partitionKey mode
schema.Fields[1].IsPartitionKey = true
collSchema.Fields[1].IsPartitionKey = true

schema := newSchemaInfo(collSchema)
partitionMaps := make(map[string]int64)
partitionMaps["test_0"] = 1
partitionMaps["test_1"] = 2
Expand Down Expand Up @@ -930,7 +935,7 @@ func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
globalMetaCache = mockCache
defer func() { globalMetaCache = nil }()

plan, err := planparserv2.CreateRetrievePlan(dr.schema, dr.req.Expr)
plan, err := planparserv2.CreateRetrievePlan(dr.schema.CollectionSchema, dr.req.Expr)
assert.NoError(t, err)
queryFunc := dr.getStreamingQueryAndDelteFunc(plan)
assert.Error(t, queryFunc(ctx, 1, qn))
Expand Down Expand Up @@ -973,7 +978,7 @@ func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) {
globalMetaCache = mockCache
defer func() { globalMetaCache = nil }()

plan, err := planparserv2.CreateRetrievePlan(dr.schema, dr.req.Expr)
plan, err := planparserv2.CreateRetrievePlan(dr.schema.CollectionSchema, dr.req.Expr)
assert.NoError(t, err)
queryFunc := dr.getStreamingQueryAndDelteFunc(plan)
assert.Error(t, queryFunc(ctx, 1, qn))
Expand Down
Loading
Loading