Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: [2.4] Invalidate collection cache when release collection (#37577) #37628

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p

if globalMetaCache != nil {
switch msgType {
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection:
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection:
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
Expand Down
26 changes: 26 additions & 0 deletions internal/querycoordv2/job/job_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
)
Expand All @@ -42,6 +45,7 @@ type ReleaseCollectionJob struct {
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
checkerController *checkers.CheckerController
proxyManager proxyutil.ProxyClientManagerInterface
}

func NewReleaseCollectionJob(ctx context.Context,
Expand All @@ -53,6 +57,7 @@ func NewReleaseCollectionJob(ctx context.Context,
targetMgr *meta.TargetManager,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
proxyManager proxyutil.ProxyClientManagerInterface,
) *ReleaseCollectionJob {
return &ReleaseCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
Expand All @@ -64,6 +69,7 @@ func NewReleaseCollectionJob(ctx context.Context,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
proxyManager: proxyManager,
}
}

Expand Down Expand Up @@ -98,6 +104,15 @@ func (job *ReleaseCollectionJob) Execute() error {
}

job.targetObserver.ReleaseCollection(req.GetCollectionID())

// try best discard cache
// shall not affect releasing if failed
job.proxyManager.InvalidateCollectionMetaCache(job.ctx,
&proxypb.InvalidateCollMetaCacheRequest{
CollectionID: req.GetCollectionID(),
},
proxyutil.SetMsgType(commonpb.MsgType_ReleaseCollection))

waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease)))
Expand All @@ -118,6 +133,7 @@ type ReleasePartitionJob struct {
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
checkerController *checkers.CheckerController
proxyManager proxyutil.ProxyClientManagerInterface
}

func NewReleasePartitionJob(ctx context.Context,
Expand All @@ -129,6 +145,7 @@ func NewReleasePartitionJob(ctx context.Context,
targetMgr *meta.TargetManager,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
proxyManager proxyutil.ProxyClientManagerInterface,
) *ReleasePartitionJob {
return &ReleasePartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
Expand All @@ -140,6 +157,7 @@ func NewReleasePartitionJob(ctx context.Context,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
proxyManager: proxyManager,
}
}

Expand Down Expand Up @@ -181,6 +199,14 @@ func (job *ReleasePartitionJob) Execute() error {
}
job.targetObserver.ReleaseCollection(req.GetCollectionID())
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
// try best discard cache
// shall not affect releasing if failed
job.proxyManager.InvalidateCollectionMetaCache(job.ctx,
&proxypb.InvalidateCollMetaCacheRequest{
CollectionID: req.GetCollectionID(),
},
proxyutil.SetMsgType(commonpb.MsgType_ReleaseCollection))

waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
} else {
err := job.meta.CollectionManager.RemovePartition(req.GetCollectionID(), toRelease...)
Expand Down
10 changes: 10 additions & 0 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.targetObserver,

suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand All @@ -1080,6 +1081,7 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand Down Expand Up @@ -1109,6 +1111,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand All @@ -1132,6 +1135,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand All @@ -1157,6 +1161,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand Down Expand Up @@ -1190,6 +1195,7 @@ func (suite *JobSuite) TestDynamicRelease() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
return job
}
Expand All @@ -1207,6 +1213,7 @@ func (suite *JobSuite) TestDynamicRelease() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
return job
}
Expand Down Expand Up @@ -1505,6 +1512,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(releaseCollectionJob)
err := releaseCollectionJob.Wait()
Expand All @@ -1524,6 +1532,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(releasePartitionJob)
err = releasePartitionJob.Wait()
Expand Down Expand Up @@ -1665,6 +1674,7 @@ func (suite *JobSuite) releaseAll() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
Expand Down
2 changes: 2 additions & 0 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
s.targetMgr,
s.targetObserver,
s.checkerController,
s.proxyClientManager,
)
s.jobScheduler.Add(releaseJob)
err := releaseJob.Wait()
Expand Down Expand Up @@ -454,6 +455,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
s.targetMgr,
s.targetObserver,
s.checkerController,
s.proxyClientManager,
)
s.jobScheduler.Add(releaseJob)
err := releaseJob.Wait()
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (suite *ServiceSuite) SetupTest() {
getBalancerFunc: func() balance.Balance { return suite.balancer },
distController: suite.distController,
ctx: context.Background(),
proxyClientManager: suite.proxyManager,
}

suite.server.UpdateStateCode(commonpb.StateCode_Healthy)
Expand Down
Loading