Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add collection level balancer switch #30411

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add collection level balance checker switch
Signed-off-by: sunby <sunbingyi1992@gmail.com>
  • Loading branch information
sunby committed Jan 31, 2024
commit 487b3526505ce2fea75660747cc024930b189ac8
3 changes: 3 additions & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -650,11 +650,13 @@ message DeleteRequest {
message ActivateCheckerRequest {
common.MsgBase base = 1;
int32 checkerID = 2;
repeated int64 collections = 3;
}

message DeactivateCheckerRequest {
common.MsgBase base = 1;
int32 checkerID = 2;
repeated int64 collections = 3;
}

message ListCheckersRequest {
Expand All @@ -672,5 +674,6 @@ message CheckerInfo {
string desc = 2;
bool activated = 3;
bool found = 4;
repeated int64 inactiveCollections = 5;
}

5 changes: 3 additions & 2 deletions internal/querycoordv2/checkers/balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ func (b *BalanceChecker) replicasToBalance() []int64 {
return loadedCollections[i] < loadedCollections[j]
})

activeCollections := lo.Filter(loadedCollections, func(collectionID int64, _ int) bool { return b.checkerActivation.IsCollectionActive(collectionID) })
// balance collections influenced by stopping nodes
stoppingReplicas := make([]int64, 0)
for _, cid := range loadedCollections {
for _, cid := range activeCollections {
replicas := b.meta.ReplicaManager.GetByCollection(cid)
for _, replica := range replicas {
for _, nodeID := range replica.GetNodes() {
Expand Down Expand Up @@ -107,7 +108,7 @@ func (b *BalanceChecker) replicasToBalance() []int64 {
// iterator one normal collection in one round
normalReplicasToBalance := make([]int64, 0)
hasUnbalancedCollection := false
for _, cid := range loadedCollections {
for _, cid := range activeCollections {
if b.normalBalanceCollectionsCurrentRound.Contain(cid) {
log.Debug("ScoreBasedBalancer has balanced collection, skip balancing in this round",
zap.Int64("collectionID", cid))
Expand Down
14 changes: 14 additions & 0 deletions internal/querycoordv2/checkers/balance_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,20 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
suite.Len(tasks, 2)
}

func (suite *BalanceCheckerTestSuite) TestCollectionActivation() {
suite.scheduler.EXPECT().GetSegmentTaskNum().Return(0)
collection1 := utils.CreateTestCollection(int64(1), int32(1))
collection1.Status = querypb.LoadStatus_Loaded
suite.NoError(suite.checker.meta.PutCollection(collection1))
replica1 := utils.CreateTestReplica(1, 1, []int64{1})
suite.NoError(suite.meta.ReplicaManager.Put(replica1))
suite.ElementsMatch([]int64{1}, suite.checker.replicasToBalance())
suite.checker.DeactivateCollection(1)
suite.Empty(suite.checker.replicasToBalance())
suite.checker.ActivateCollection(1)
suite.ElementsMatch([]int64{1}, suite.checker.replicasToBalance())
}

func TestBalanceCheckerSuite(t *testing.T) {
suite.Run(t, new(BalanceCheckerTestSuite))
}
28 changes: 26 additions & 2 deletions internal/querycoordv2/checkers/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type Checker interface {
Expand All @@ -31,10 +32,15 @@ type Checker interface {
IsActive() bool
Activate()
Deactivate()
DeactivateCollection(collectionID int64)
ActivateCollection(collectionID int64)
IsCollectionActive(collectionID int64) bool
GetInactiveCollections() []int64
}

type checkerActivation struct {
active atomic.Bool
active atomic.Bool
inactiveCollections *typeutil.ConcurrentSet[int64]
}

func (c *checkerActivation) IsActive() bool {
Expand All @@ -49,8 +55,26 @@ func (c *checkerActivation) Deactivate() {
c.active.Store(false)
}

func (c *checkerActivation) DeactivateCollection(collectionID int64) {
c.inactiveCollections.Insert(collectionID)
}

func (c *checkerActivation) ActivateCollection(collectionID int64) {
c.inactiveCollections.Remove(collectionID)
}

func (c *checkerActivation) IsCollectionActive(collectionID int64) bool {
return !c.inactiveCollections.Contain(collectionID)
}

func (c *checkerActivation) GetInactiveCollections() []int64 {
return c.inactiveCollections.Collect()
}

func newCheckerActivation() *checkerActivation {
c := &checkerActivation{}
c := &checkerActivation{
inactiveCollections: typeutil.NewConcurrentSet[int64](),
}
c.Activate()
return c
}
52 changes: 38 additions & 14 deletions internal/querycoordv2/checkers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,34 +168,58 @@ func (controller *CheckerController) check(ctx context.Context, checkType utils.
}

func (controller *CheckerController) Deactivate(typ utils.CheckerType) error {
for _, checker := range controller.checkers {
if checker.ID() == typ {
checker.Deactivate()
return nil
}
if checker, ok := controller.checkers[typ]; ok {
checker.Deactivate()
return nil
}
return errTypeNotFound
}

func (controller *CheckerController) Activate(typ utils.CheckerType) error {
for _, checker := range controller.checkers {
if checker.ID() == typ {
checker.Activate()
return nil
}
if checker, ok := controller.checkers[typ]; ok {
checker.Activate()
return nil
}
return errTypeNotFound
}

func (controller *CheckerController) IsActive(typ utils.CheckerType) (bool, error) {
for _, checker := range controller.checkers {
if checker.ID() == typ {
return checker.IsActive(), nil
}
if checker, ok := controller.checkers[typ]; ok {
return checker.IsActive(), nil
}
return false, errTypeNotFound
}

func (controller *CheckerController) DeactivateCollection(typ utils.CheckerType, collection int64) error {
if checker, ok := controller.checkers[typ]; ok {
checker.DeactivateCollection(collection)
return nil
}
return errTypeNotFound
}

func (controller *CheckerController) ActivateCollection(typ utils.CheckerType, collection int64) error {
if checker, ok := controller.checkers[typ]; ok {
checker.ActivateCollection(collection)
return nil
}
return errTypeNotFound
}

func (controller *CheckerController) IsCollectionActive(typ utils.CheckerType, collection int64) (bool, error) {
if checker, ok := controller.checkers[typ]; ok {
return checker.IsCollectionActive(collection), nil
}
return false, errTypeNotFound
}

func (controller *CheckerController) GetInactiveCollections(typ utils.CheckerType) []int64 {
if checker, ok := controller.checkers[typ]; ok {
return checker.GetInactiveCollections()
}
return nil
}

func (controller *CheckerController) Checkers() []Checker {
checkers := make([]Checker, 0, len(controller.checkers))
for _, checker := range controller.checkers {
Expand Down
43 changes: 31 additions & 12 deletions internal/querycoordv2/ops_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ func (s *Server) ListCheckers(ctx context.Context, req *querypb.ListCheckersRequ
for _, checker := range checkers {
if checkerIDSet.Len() == 0 || checkerIDSet.Contain(int32(checker.ID())) {
resp.CheckerInfos = append(resp.CheckerInfos, &querypb.CheckerInfo{
Id: int32(checker.ID()),
Activated: checker.IsActive(),
Desc: checker.ID().String(),
Found: true,
Id: int32(checker.ID()),
Activated: checker.IsActive(),
Desc: checker.ID().String(),
Found: true,
InactiveCollections: checker.GetInactiveCollections(),
})
checkerIDSet.Remove(int32(checker.ID()))
}
Expand All @@ -67,29 +68,47 @@ func (s *Server) ListCheckers(ctx context.Context, req *querypb.ListCheckersRequ
}

func (s *Server) ActivateChecker(ctx context.Context, req *querypb.ActivateCheckerRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx)
log := log.Ctx(ctx).With(zap.Int32("checker", req.CheckerID)).With(zap.Int64s("collections", req.Collections))
log.Info("activate checker request received")
if err := merr.CheckHealthy(s.State()); err != nil {
log.Warn("failed to activate checker", zap.Error(err))
return merr.Status(err), nil
}
if err := s.checkerController.Activate(utils.CheckerType(req.CheckerID)); err != nil {
log.Warn("failed to activate checker", zap.Error(err))
return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil

if len(req.Collections) == 0 {
if err := s.checkerController.Activate(utils.CheckerType(req.CheckerID)); err != nil {
log.Warn("failed to activate checker", zap.Error(err))
return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil
}
} else {
for _, collection := range req.Collections {
if err := s.checkerController.ActivateCollection(utils.CheckerType(req.CheckerID), collection); err != nil {
log.Warn("failed to activate collection", zap.Int64("collection", collection), zap.Error(err))
}
}
}
return merr.Success(), nil
}

func (s *Server) DeactivateChecker(ctx context.Context, req *querypb.DeactivateCheckerRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx)
log := log.Ctx(ctx).With(zap.Int32("checker", req.CheckerID)).With(zap.Int64s("collections", req.Collections))
log.Info("deactivate checker request received")
if err := merr.CheckHealthy(s.State()); err != nil {
log.Warn("failed to deactivate checker", zap.Error(err))
return merr.Status(err), nil
}
if err := s.checkerController.Deactivate(utils.CheckerType(req.CheckerID)); err != nil {
log.Warn("failed to deactivate checker", zap.Error(err))
return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil
if len(req.Collections) == 0 {
if err := s.checkerController.Deactivate(utils.CheckerType(req.CheckerID)); err != nil {
log.Warn("failed to deactivate checker", zap.Error(err))
return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil
}
} else {
for _, collection := range req.Collections {
if err := s.checkerController.DeactivateCollection(utils.CheckerType(req.CheckerID), collection); err != nil {
log.Warn("failed to deactivate collection", zap.Error(err))
return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil
}
}
}
return merr.Success(), nil
}