Skip to content

Commit

Permalink
enhance: [2.4] Invalidate collection cache when release collection (#…
Browse files Browse the repository at this point in the history
…37577) (#37628)

Cherry-pick from master
pr: #37577
Related to #37395

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia authored Nov 13, 2024
1 parent d073f32 commit 8801322
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 1 deletion.
2 changes: 1 addition & 1 deletion internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p

if globalMetaCache != nil {
switch msgType {
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection:
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection:
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
Expand Down
26 changes: 26 additions & 0 deletions internal/querycoordv2/job/job_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
)
Expand All @@ -42,6 +45,7 @@ type ReleaseCollectionJob struct {
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
checkerController *checkers.CheckerController
proxyManager proxyutil.ProxyClientManagerInterface
}

func NewReleaseCollectionJob(ctx context.Context,
Expand All @@ -53,6 +57,7 @@ func NewReleaseCollectionJob(ctx context.Context,
targetMgr *meta.TargetManager,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
proxyManager proxyutil.ProxyClientManagerInterface,
) *ReleaseCollectionJob {
return &ReleaseCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
Expand All @@ -64,6 +69,7 @@ func NewReleaseCollectionJob(ctx context.Context,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
proxyManager: proxyManager,
}
}

Expand Down Expand Up @@ -98,6 +104,15 @@ func (job *ReleaseCollectionJob) Execute() error {
}

job.targetObserver.ReleaseCollection(req.GetCollectionID())

// try best discard cache
// shall not affect releasing if failed
job.proxyManager.InvalidateCollectionMetaCache(job.ctx,
&proxypb.InvalidateCollMetaCacheRequest{
CollectionID: req.GetCollectionID(),
},
proxyutil.SetMsgType(commonpb.MsgType_ReleaseCollection))

waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease)))
Expand All @@ -118,6 +133,7 @@ type ReleasePartitionJob struct {
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
checkerController *checkers.CheckerController
proxyManager proxyutil.ProxyClientManagerInterface
}

func NewReleasePartitionJob(ctx context.Context,
Expand All @@ -129,6 +145,7 @@ func NewReleasePartitionJob(ctx context.Context,
targetMgr *meta.TargetManager,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
proxyManager proxyutil.ProxyClientManagerInterface,
) *ReleasePartitionJob {
return &ReleasePartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
Expand All @@ -140,6 +157,7 @@ func NewReleasePartitionJob(ctx context.Context,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
proxyManager: proxyManager,
}
}

Expand Down Expand Up @@ -181,6 +199,14 @@ func (job *ReleasePartitionJob) Execute() error {
}
job.targetObserver.ReleaseCollection(req.GetCollectionID())
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
// try best discard cache
// shall not affect releasing if failed
job.proxyManager.InvalidateCollectionMetaCache(job.ctx,
&proxypb.InvalidateCollMetaCacheRequest{
CollectionID: req.GetCollectionID(),
},
proxyutil.SetMsgType(commonpb.MsgType_ReleaseCollection))

waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
} else {
err := job.meta.CollectionManager.RemovePartition(req.GetCollectionID(), toRelease...)
Expand Down
10 changes: 10 additions & 0 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.targetObserver,

suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand All @@ -1080,6 +1081,7 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand Down Expand Up @@ -1109,6 +1111,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand All @@ -1132,6 +1135,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand All @@ -1157,6 +1161,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand Down Expand Up @@ -1190,6 +1195,7 @@ func (suite *JobSuite) TestDynamicRelease() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
return job
}
Expand All @@ -1207,6 +1213,7 @@ func (suite *JobSuite) TestDynamicRelease() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
return job
}
Expand Down Expand Up @@ -1505,6 +1512,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(releaseCollectionJob)
err := releaseCollectionJob.Wait()
Expand All @@ -1524,6 +1532,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(releasePartitionJob)
err = releasePartitionJob.Wait()
Expand Down Expand Up @@ -1665,6 +1674,7 @@ func (suite *JobSuite) releaseAll() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand Down
2 changes: 2 additions & 0 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
s.targetMgr,
s.targetObserver,
s.checkerController,
s.proxyClientManager,
)
s.jobScheduler.Add(releaseJob)
err := releaseJob.Wait()
Expand Down Expand Up @@ -454,6 +455,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
s.targetMgr,
s.targetObserver,
s.checkerController,
s.proxyClientManager,
)
s.jobScheduler.Add(releaseJob)
err := releaseJob.Wait()
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (suite *ServiceSuite) SetupTest() {
getBalancerFunc: func() balance.Balance { return suite.balancer },
distController: suite.distController,
ctx: context.Background(),
proxyClientManager: suite.proxyManager,
}

suite.server.UpdateStateCode(commonpb.StateCode_Healthy)
Expand Down

0 comments on commit 8801322

Please sign in to comment.