From 8801322371f7bd1abb1693eb4a72963acbdea371 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 13 Nov 2024 14:00:31 +0800 Subject: [PATCH] enhance: [2.4] Invalidate collection cache when release collection (#37577) (#37628) Cherry-pick from master pr: #37577 Related to #37395 --------- Signed-off-by: Congqi Xia --- internal/proxy/impl.go | 2 +- internal/querycoordv2/job/job_release.go | 26 ++++++++++++++++++++++++ internal/querycoordv2/job/job_test.go | 10 +++++++++ internal/querycoordv2/services.go | 2 ++ internal/querycoordv2/services_test.go | 1 + 5 files changed, 40 insertions(+), 1 deletion(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 008a7ce98d83c..87c5f1fa09f75 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -123,7 +123,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p if globalMetaCache != nil { switch msgType { - case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection: + case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection: if collectionName != "" { globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName) diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index 99811082a4e1e..ea6289ba8a8a9 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -23,11 +23,14 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" ) @@ -42,6 +45,7 @@ type ReleaseCollectionJob struct { targetMgr *meta.TargetManager targetObserver *observers.TargetObserver checkerController *checkers.CheckerController + proxyManager proxyutil.ProxyClientManagerInterface } func NewReleaseCollectionJob(ctx context.Context, @@ -53,6 +57,7 @@ func NewReleaseCollectionJob(ctx context.Context, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver, checkerController *checkers.CheckerController, + proxyManager proxyutil.ProxyClientManagerInterface, ) *ReleaseCollectionJob { return &ReleaseCollectionJob{ BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), @@ -64,6 +69,7 @@ func NewReleaseCollectionJob(ctx context.Context, targetMgr: targetMgr, targetObserver: targetObserver, checkerController: checkerController, + proxyManager: proxyManager, } } @@ -98,6 +104,15 @@ func (job *ReleaseCollectionJob) Execute() error { } job.targetObserver.ReleaseCollection(req.GetCollectionID()) + + // try best discard cache + // shall not affect releasing if failed + job.proxyManager.InvalidateCollectionMetaCache(job.ctx, + &proxypb.InvalidateCollMetaCacheRequest{ + CollectionID: req.GetCollectionID(), + }, + proxyutil.SetMsgType(commonpb.MsgType_ReleaseCollection)) + waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease))) @@ -118,6 +133,7 @@ type ReleasePartitionJob struct { targetMgr *meta.TargetManager targetObserver *observers.TargetObserver checkerController *checkers.CheckerController + proxyManager proxyutil.ProxyClientManagerInterface } func NewReleasePartitionJob(ctx context.Context, @@ -129,6 +145,7 @@ func NewReleasePartitionJob(ctx context.Context, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver, checkerController *checkers.CheckerController, + proxyManager proxyutil.ProxyClientManagerInterface, ) *ReleasePartitionJob { return &ReleasePartitionJob{ BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), @@ -140,6 +157,7 @@ func NewReleasePartitionJob(ctx context.Context, targetMgr: targetMgr, targetObserver: targetObserver, checkerController: checkerController, + proxyManager: proxyManager, } } @@ -181,6 +199,14 @@ func (job *ReleasePartitionJob) Execute() error { } job.targetObserver.ReleaseCollection(req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() + // try best discard cache + // shall not affect releasing if failed + job.proxyManager.InvalidateCollectionMetaCache(job.ctx, + &proxypb.InvalidateCollMetaCacheRequest{ + CollectionID: req.GetCollectionID(), + }, + proxyutil.SetMsgType(commonpb.MsgType_ReleaseCollection)) + waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) } else { err := job.meta.CollectionManager.RemovePartition(req.GetCollectionID(), toRelease...) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index ff10f5003bc00..0fc786dbbc763 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -1058,6 +1058,7 @@ func (suite *JobSuite) TestReleaseCollection() { suite.targetObserver, suite.checkerController, + suite.proxyManager, ) suite.scheduler.Add(job) err := job.Wait() @@ -1080,6 +1081,7 @@ func (suite *JobSuite) TestReleaseCollection() { suite.targetMgr, suite.targetObserver, suite.checkerController, + suite.proxyManager, ) suite.scheduler.Add(job) err := job.Wait() @@ -1109,6 +1111,7 @@ func (suite *JobSuite) TestReleasePartition() { suite.targetMgr, suite.targetObserver, suite.checkerController, + suite.proxyManager, ) suite.scheduler.Add(job) err := job.Wait() @@ -1132,6 +1135,7 @@ func (suite *JobSuite) TestReleasePartition() { suite.targetMgr, suite.targetObserver, suite.checkerController, + suite.proxyManager, ) suite.scheduler.Add(job) err := job.Wait() @@ -1157,6 +1161,7 @@ func (suite *JobSuite) TestReleasePartition() { suite.targetMgr, suite.targetObserver, suite.checkerController, + suite.proxyManager, ) suite.scheduler.Add(job) err := job.Wait() @@ -1190,6 +1195,7 @@ func (suite *JobSuite) TestDynamicRelease() { suite.targetMgr, suite.targetObserver, suite.checkerController, + suite.proxyManager, ) return job } @@ -1207,6 +1213,7 @@ func (suite *JobSuite) TestDynamicRelease() { suite.targetMgr, suite.targetObserver, suite.checkerController, + suite.proxyManager, ) return job } @@ -1505,6 +1512,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() { suite.targetMgr, suite.targetObserver, suite.checkerController, + suite.proxyManager, ) suite.scheduler.Add(releaseCollectionJob) err := releaseCollectionJob.Wait() @@ -1524,6 +1532,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() { suite.targetMgr, suite.targetObserver, suite.checkerController, + suite.proxyManager, ) suite.scheduler.Add(releasePartitionJob) err = releasePartitionJob.Wait() @@ -1665,6 +1674,7 @@ func (suite *JobSuite) releaseAll() { suite.targetMgr, suite.targetObserver, suite.checkerController, + suite.proxyManager, ) suite.scheduler.Add(job) err := job.Wait() diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index f762d0681137b..fea328ae81980 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -332,6 +332,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl s.targetMgr, s.targetObserver, s.checkerController, + s.proxyClientManager, ) s.jobScheduler.Add(releaseJob) err := releaseJob.Wait() @@ -454,6 +455,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart s.targetMgr, s.targetObserver, s.checkerController, + s.proxyClientManager, ) s.jobScheduler.Add(releaseJob) err := releaseJob.Wait() diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index d5be0f45323a9..3e3adc7592b53 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -214,6 +214,7 @@ func (suite *ServiceSuite) SetupTest() { getBalancerFunc: func() balance.Balance { return suite.balancer }, distController: suite.distController, ctx: context.Background(), + proxyClientManager: suite.proxyManager, } suite.server.UpdateStateCode(commonpb.StateCode_Healthy)