diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 33fd7dca7aa..513abb6222c 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -802,7 +802,10 @@ func (oc *OperatorController) GetOpInfluence(cluster Cluster) operator.OpInfluen defer oc.RUnlock() for _, op := range oc.operators { if !op.CheckTimeout() && !op.CheckSuccess() { - AddOpInfluence(op, influence, cluster) + region := cluster.GetRegion(op.RegionID()) + if region != nil { + op.UnfinishedInfluence(influence, region) + } } } return influence @@ -819,11 +822,7 @@ func (oc *OperatorController) GetFastOpInfluence(cluster Cluster, influence oper if !ok { continue } - region := cluster.GetRegion(op.RegionID()) - if region != nil { - log.Debug("op influence less than 10s", zap.Uint64("region-id", op.RegionID())) - op.TotalInfluence(influence, region) - } + AddOpInfluence(op, influence, cluster) } } diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index cb8f8470622..23929e53959 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -485,6 +485,83 @@ func (suite *operatorControllerTestSuite) TestDispatchOutdatedRegion() { suite.Equal(3, stream.MsgLength()) } +func (suite *operatorControllerTestSuite) TestCalcInfluence() { + cluster := mockcluster.NewCluster(suite.ctx, config.NewTestOptions()) + stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) + controller := NewOperatorController(suite.ctx, cluster, stream) + + epoch := &metapb.RegionEpoch{ConfVer: 0, Version: 0} + region := cluster.MockRegionInfo(1, 1, []uint64{2}, []uint64{}, epoch) + region = region.Clone(core.SetApproximateSize(20)) + cluster.PutRegion(region) + cluster.AddRegionStore(1, 1) + cluster.AddRegionStore(3, 1) + + steps := []operator.OpStep{ + operator.AddLearner{ToStore: 3, PeerID: 3}, + operator.PromoteLearner{ToStore: 3, PeerID: 3}, + operator.TransferLeader{FromStore: 1, ToStore: 3}, + operator.RemovePeer{FromStore: 1}, + } + op := operator.NewTestOperator(1, epoch, operator.OpRegion, steps...) + suite.True(controller.AddOperator(op)) + + check := func(influence operator.OpInfluence, id uint64, expect *operator.StoreInfluence) { + si := influence.GetStoreInfluence(id) + suite.Equal(si.LeaderCount, expect.LeaderCount) + suite.Equal(si.LeaderSize, expect.LeaderSize) + suite.Equal(si.RegionCount, expect.RegionCount) + suite.Equal(si.RegionSize, expect.RegionSize) + suite.Equal(si.StepCost[storelimit.AddPeer], expect.StepCost[storelimit.AddPeer]) + suite.Equal(si.StepCost[storelimit.RemovePeer], expect.StepCost[storelimit.RemovePeer]) + } + + influence := controller.GetOpInfluence(cluster) + check(influence, 1, &operator.StoreInfluence{ + LeaderSize: -20, + LeaderCount: -1, + RegionSize: -20, + RegionCount: -1, + StepCost: map[storelimit.Type]int64{ + storelimit.RemovePeer: 200, + }, + }) + check(influence, 3, &operator.StoreInfluence{ + LeaderSize: 20, + LeaderCount: 1, + RegionSize: 20, + RegionCount: 1, + StepCost: map[storelimit.Type]int64{ + storelimit.AddPeer: 200, + }, + }) + + region2 := region.Clone( + core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3, Role: metapb.PeerRole_Learner}), + core.WithIncConfVer(), + ) + suite.True(steps[0].IsFinish(region2)) + op.Check(region2) + + influence = controller.GetOpInfluence(cluster) + check(influence, 1, &operator.StoreInfluence{ + LeaderSize: -20, + LeaderCount: -1, + RegionSize: -20, + RegionCount: -1, + StepCost: map[storelimit.Type]int64{ + storelimit.RemovePeer: 200, + }, + }) + check(influence, 3, &operator.StoreInfluence{ + LeaderSize: 20, + LeaderCount: 1, + RegionSize: 0, + RegionCount: 0, + StepCost: make(map[storelimit.Type]int64), + }) +} + func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() { cluster := mockcluster.NewCluster(suite.ctx, config.NewTestOptions()) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)