Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

operator: fix bug for using AddOpInfluence (#5312) #5350

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,10 @@ func (oc *OperatorController) GetOpInfluence(cluster Cluster) operator.OpInfluen
defer oc.RUnlock()
for _, op := range oc.operators {
if !op.CheckTimeout() && !op.CheckSuccess() {
AddOpInfluence(op, influence, cluster)
region := cluster.GetRegion(op.RegionID())
if region != nil {
op.UnfinishedInfluence(influence, region)
}
}
}
return influence
Expand All @@ -819,11 +822,7 @@ func (oc *OperatorController) GetFastOpInfluence(cluster Cluster, influence oper
if !ok {
continue
}
region := cluster.GetRegion(op.RegionID())
if region != nil {
log.Debug("op influence less than 10s", zap.Uint64("region-id", op.RegionID()))
op.TotalInfluence(influence, region)
}
AddOpInfluence(op, influence, cluster)
}
}

Expand Down
84 changes: 84 additions & 0 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,94 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) {
c.Assert(stream.MsgLength(), Equals, 3)
}

<<<<<<< HEAD
func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) {
cluster := mockcluster.NewCluster(t.ctx, config.NewTestOptions())
stream := hbstream.NewTestHeartbeatStreams(t.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewOperatorController(t.ctx, cluster, stream)
=======
func (suite *operatorControllerTestSuite) TestCalcInfluence() {
cluster := mockcluster.NewCluster(suite.ctx, config.NewTestOptions())
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewOperatorController(suite.ctx, cluster, stream)

epoch := &metapb.RegionEpoch{ConfVer: 0, Version: 0}
region := cluster.MockRegionInfo(1, 1, []uint64{2}, []uint64{}, epoch)
region = region.Clone(core.SetApproximateSize(20))
cluster.PutRegion(region)
cluster.AddRegionStore(1, 1)
cluster.AddRegionStore(3, 1)

steps := []operator.OpStep{
operator.AddLearner{ToStore: 3, PeerID: 3},
operator.PromoteLearner{ToStore: 3, PeerID: 3},
operator.TransferLeader{FromStore: 1, ToStore: 3},
operator.RemovePeer{FromStore: 1},
}
op := operator.NewTestOperator(1, epoch, operator.OpRegion, steps...)
suite.True(controller.AddOperator(op))

check := func(influence operator.OpInfluence, id uint64, expect *operator.StoreInfluence) {
si := influence.GetStoreInfluence(id)
suite.Equal(si.LeaderCount, expect.LeaderCount)
suite.Equal(si.LeaderSize, expect.LeaderSize)
suite.Equal(si.RegionCount, expect.RegionCount)
suite.Equal(si.RegionSize, expect.RegionSize)
suite.Equal(si.StepCost[storelimit.AddPeer], expect.StepCost[storelimit.AddPeer])
suite.Equal(si.StepCost[storelimit.RemovePeer], expect.StepCost[storelimit.RemovePeer])
}

influence := controller.GetOpInfluence(cluster)
check(influence, 1, &operator.StoreInfluence{
LeaderSize: -20,
LeaderCount: -1,
RegionSize: -20,
RegionCount: -1,
StepCost: map[storelimit.Type]int64{
storelimit.RemovePeer: 200,
},
})
check(influence, 3, &operator.StoreInfluence{
LeaderSize: 20,
LeaderCount: 1,
RegionSize: 20,
RegionCount: 1,
StepCost: map[storelimit.Type]int64{
storelimit.AddPeer: 200,
},
})

region2 := region.Clone(
core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3, Role: metapb.PeerRole_Learner}),
core.WithIncConfVer(),
)
suite.True(steps[0].IsFinish(region2))
op.Check(region2)

influence = controller.GetOpInfluence(cluster)
check(influence, 1, &operator.StoreInfluence{
LeaderSize: -20,
LeaderCount: -1,
RegionSize: -20,
RegionCount: -1,
StepCost: map[storelimit.Type]int64{
storelimit.RemovePeer: 200,
},
})
check(influence, 3, &operator.StoreInfluence{
LeaderSize: 20,
LeaderCount: 1,
RegionSize: 0,
RegionCount: 0,
StepCost: make(map[storelimit.Type]int64),
})
}

func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() {
cluster := mockcluster.NewCluster(suite.ctx, config.NewTestOptions())
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewOperatorController(suite.ctx, cluster, stream)
>>>>>>> e339c83bb (operator: fix bug for using `AddOpInfluence ` (#5312))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some conflicts.


// Create a new region with epoch(0, 0)
// the region has two peers with its peer id allocated incrementally.
Expand Down