Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: improve the leader distribution after region scatter #2659

Merged
merged 4 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,16 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
}

// AddLeaderRegion adds region with specified leader and followers.
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderID uint64, followerIds ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, followerStoreIDs...)
region := origin.Clone(core.SetApproximateSize(10), core.SetApproximateKeys(10))
mc.PutRegion(region)
return region
}

// AddRegionWithLearner adds region with specified leader, followers and learners.
func (mc *Cluster) AddRegionWithLearner(regionID uint64, leaderID uint64, followerIDs, learnerIDs []uint64) *core.RegionInfo {
origin := mc.MockRegionInfo(regionID, leaderID, followerIDs, learnerIDs, nil)
func (mc *Cluster) AddRegionWithLearner(regionID uint64, leaderStoreID uint64, followerStoreIDs, learnerStoreIDs []uint64) *core.RegionInfo {
origin := mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, learnerStoreIDs, nil)
region := origin.Clone(core.SetApproximateSize(10), core.SetApproximateKeys(10))
mc.PutRegion(region)
return region
Expand Down Expand Up @@ -515,8 +515,8 @@ func (mc *Cluster) UpdateStoreStatus(id uint64) {
mc.PutStore(newStore)
}

func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderID uint64, followerIDs ...uint64) *core.RegionInfo {
return mc.MockRegionInfo(regionID, leaderID, followerIDs, []uint64{}, nil)
func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo {
return mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, []uint64{}, nil)
}

// GetOpt mocks method.
Expand Down Expand Up @@ -595,23 +595,23 @@ func (mc *Cluster) RemoveScheduler(name string) error {
}

// MockRegionInfo returns a mock region
func (mc *Cluster) MockRegionInfo(regionID uint64, leaderID uint64,
followerIDs, learnerIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {
func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64,
followerStoreIDs, learnerStoreIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {

region := &metapb.Region{
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
RegionEpoch: epoch,
}
leader, _ := mc.AllocPeer(leaderID)
leader, _ := mc.AllocPeer(leaderStoreID)
region.Peers = []*metapb.Peer{leader}
for _, id := range followerIDs {
peer, _ := mc.AllocPeer(id)
for _, storeID := range followerStoreIDs {
peer, _ := mc.AllocPeer(storeID)
region.Peers = append(region.Peers, peer)
}
for _, id := range learnerIDs {
peer, _ := mc.AllocPeer(id)
for _, storeID := range learnerStoreIDs {
peer, _ := mc.AllocPeer(storeID)
peer.IsLearner = true
region.Peers = append(region.Peers, peer)
}
Expand Down
5 changes: 4 additions & 1 deletion server/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func isRegionMatch(a, b *core.RegionInfo) bool {
}

// CreateScatterRegionOperator creates an operator that scatters the specified region.
func CreateScatterRegionOperator(desc string, cluster Cluster, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer) (*Operator, error) {
func CreateScatterRegionOperator(desc string, cluster Cluster, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) (*Operator, error) {
// randomly pick a leader.
var ids []uint64
for id, peer := range targetPeers {
Expand All @@ -159,6 +159,9 @@ func CreateScatterRegionOperator(desc string, cluster Cluster, origin *core.Regi
if len(ids) > 0 {
leader = ids[rand.Intn(len(ids))]
}
if targetLeader != 0 {
leader = targetLeader
}
return NewBuilder(desc, cluster, origin).
SetPeers(targetPeers).
SetLeader(leader).
Expand Down
55 changes: 50 additions & 5 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package schedule

import (
"math"
"math/rand"
"sync"

Expand All @@ -29,6 +30,29 @@ import (

const regionScatterName = "region-scatter"

type selectedLeaderStores struct {
mu sync.Mutex
stores map[uint64]uint64
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *selectedLeaderStores) put(id uint64) {
s.mu.Lock()
defer s.mu.Unlock()
s.stores[id] = s.stores[id] + 1
}
Comment on lines +38 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the leader is transferred manually, will the old store's count minus 1 and new store's count plus 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. This scheduler assumes that the leader and region will not change significantly after scheduling. Maybe we need to discuss optimization in another issue.

Copy link
Contributor

@Yisaer Yisaer Jul 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can directly to make a mechanism (syncer or tracker) to record the correct leader count distribution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, it will be more complicated to do this, such as truncate table we need to remove the regions from the tracker, and what if recover a table? The current approach is at least feasible in general, there will be no more operator after cluster in balanced, especially in big cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the tracker you mentioned is a good idea, and I even want to use it to report why there is this operator produce.
cc @Yisaer


func (s *selectedLeaderStores) get(id uint64) uint64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.stores[id]
}

func newSelectedLeaderStores() *selectedLeaderStores {
return &selectedLeaderStores{
stores: make(map[uint64]uint64),
}
}

type selectedStores struct {
mu sync.Mutex
stores map[uint64]struct{}
Expand Down Expand Up @@ -86,15 +110,17 @@ func NewRegionScatterer(cluster opt.Cluster) *RegionScatterer {
}

type engineContext struct {
filters []filter.Filter
selected *selectedStores
filters []filter.Filter
selected *selectedStores
selectedLeader *selectedLeaderStores
}

func newEngineContext(filters ...filter.Filter) engineContext {
filters = append(filters, filter.StoreStateFilter{ActionScope: regionScatterName})
return engineContext{
filters: filters,
selected: newSelectedStores(),
filters: filters,
selected: newSelectedStores(),
selectedLeader: newSelectedLeaderStores(),
}
}

Expand Down Expand Up @@ -153,6 +179,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *operator.Opera
}

scatterWithSameEngine(ordinaryPeers, r.ordinaryEngine)
// FIXME: target leader only consider the ordinary engine.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about creating an issue to track this (Ignore me if already).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add more comments.

targetLeader := r.collectAvailableLeaderStores(targetPeers, r.ordinaryEngine)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the leader store be collected again after collectAvailableLeaderStores in scatterWithSameEngine function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now, only consider the dorinaryEngine.

for engine, peers := range specialPeers {
context, ok := r.specialEngines[engine]
if !ok {
Expand All @@ -162,7 +190,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *operator.Opera
scatterWithSameEngine(peers, context)
}

op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers)
op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers, targetLeader)
if err != nil {
log.Debug("fail to create scatter region operator", zap.Error(err))
return nil
Expand Down Expand Up @@ -204,6 +232,7 @@ func (r *RegionScatterer) collectAvailableStores(region *core.RegionInfo, contex
filters := []filter.Filter{
context.selected.newFilter(r.name),
filter.NewExcludedFilter(r.name, nil, region.GetStoreIds()),
filter.StoreStateFilter{ActionScope: r.name, MoveRegion: true},
}
filters = append(filters, context.filters...)

Expand All @@ -216,3 +245,19 @@ func (r *RegionScatterer) collectAvailableStores(region *core.RegionInfo, contex
}
return targets
}

func (r *RegionScatterer) collectAvailableLeaderStores(peers map[uint64]*metapb.Peer, context engineContext) uint64 {
nolouch marked this conversation as resolved.
Show resolved Hide resolved
m := uint64(math.MaxUint64)
id := uint64(0)
for storeID := range peers {
count := context.selectedLeader.get(storeID)
if m > count {
m = count
id = storeID
}
}
if id != 0 {
context.selectedLeader.put(id)
}
return id
}
56 changes: 40 additions & 16 deletions server/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package schedule

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/pd/v4/pkg/mock/mockcluster"
"github.com/pingcap/pd/v4/pkg/mock/mockhbstream"
Expand Down Expand Up @@ -41,21 +42,21 @@ var _ = Suite(&testScatterRegionSuite{})
type testScatterRegionSuite struct{}

func (s *testScatterRegionSuite) TestSixStores(c *C) {
s.scatter(c, 6, 4, false)
s.scatter(c, 6, 4, true)
s.scatter(c, 6, 100, false)
s.scatter(c, 6, 100, true)
}

func (s *testScatterRegionSuite) TestFiveStores(c *C) {
s.scatter(c, 5, 5, false)
s.scatter(c, 5, 5, true)
s.scatter(c, 5, 100, false)
s.scatter(c, 5, 100, true)
}

func (s *testScatterRegionSuite) TestSixSpecialStores(c *C) {
s.scatterSpecial(c, 3, 6, 4)
s.scatterSpecial(c, 3, 6, 100)
}

func (s *testScatterRegionSuite) TestFiveSpecialStores(c *C) {
s.scatterSpecial(c, 5, 5, 5)
s.scatterSpecial(c, 5, 5, 100)
}

func (s *testScatterRegionSuite) checkOperator(op *operator.Operator, c *C) {
Expand All @@ -81,11 +82,11 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64, use
}
tc.EnablePlacementRules = useRules

seq := newSequencer(numStores)
// Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace.
tc.AddLeaderRegion(1, 1, 2, 3)
for i := uint64(2); i <= numRegions; i++ {
tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next())
// region distributed in same stores.
tc.AddLeaderRegion(i, 1, 2, 3)
}

scatterer := NewRegionScatterer(tc)
Expand All @@ -99,16 +100,29 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64, use
}

countPeers := make(map[uint64]uint64)
countLeader := make(map[uint64]uint64)
for i := uint64(1); i <= numRegions; i++ {
region := tc.GetRegion(i)
for _, peer := range region.GetPeers() {
countPeers[peer.GetStoreId()]++
if peer.GetId() == region.GetLeader().GetId() {
countLeader[peer.GetStoreId()]++
}
}
}

// Each store should have the same number of peers.
for _, count := range countPeers {
c.Assert(count, Equals, numRegions*3/numStores)
c.Assert(float64(count), LessEqual, 1.1*float64(numRegions*3)/float64(numStores))
c.Assert(float64(count), GreaterEqual, 0.9*float64(numRegions*3)/float64(numStores))
}

// Each store should have the same number of leaders.
c.Assert(len(countPeers), Equals, int(numStores))
c.Assert(len(countLeader), Equals, int(numStores))
for _, count := range countLeader {
c.Assert(float64(count), LessEqual, 1.1*float64(numRegions)/float64(numStores))
c.Assert(float64(count), GreaterEqual, 0.9*float64(numRegions)/float64(numStores))
}
}

Expand All @@ -129,16 +143,16 @@ func (s *testScatterRegionSuite) scatterSpecial(c *C, numOrdinaryStores, numSpec
GroupID: "pd", ID: "learner", Role: placement.Learner, Count: 3,
LabelConstraints: []placement.LabelConstraint{{Key: "engine", Op: placement.In, Values: []string{"tiflash"}}}}), IsNil)

ordinarySeq := newSequencer(numOrdinaryStores)
specialSeq := newSequencerWithMinID(numOrdinaryStores+1, numOrdinaryStores+numSpecialStores)
//ordinarySeq := newSequencer(numOrdinaryStores)
// specialSeq := newSequencerWithMinID(numOrdinaryStores+1, numOrdinaryStores+numSpecialStores)
nolouch marked this conversation as resolved.
Show resolved Hide resolved
// Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace.
tc.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{numOrdinaryStores + 1, numOrdinaryStores + 2, numOrdinaryStores + 3})
for i := uint64(2); i <= numRegions; i++ {
tc.AddRegionWithLearner(
i,
ordinarySeq.next(),
[]uint64{ordinarySeq.next(), ordinarySeq.next()},
[]uint64{specialSeq.next(), specialSeq.next(), specialSeq.next()},
1,
[]uint64{2, 3},
[]uint64{numOrdinaryStores + 1, numOrdinaryStores + 2, numOrdinaryStores + 3},
)
}

Expand All @@ -154,6 +168,7 @@ func (s *testScatterRegionSuite) scatterSpecial(c *C, numOrdinaryStores, numSpec

countOrdinaryPeers := make(map[uint64]uint64)
countSpecialPeers := make(map[uint64]uint64)
countOrdinaryLeaders := make(map[uint64]uint64)
for i := uint64(1); i <= numRegions; i++ {
region := tc.GetRegion(i)
for _, peer := range region.GetPeers() {
Expand All @@ -164,15 +179,24 @@ func (s *testScatterRegionSuite) scatterSpecial(c *C, numOrdinaryStores, numSpec
} else {
countOrdinaryPeers[storeID]++
}
if peer.GetId() == region.GetLeader().GetId() {
countOrdinaryLeaders[storeID]++
}
}
}

// Each store should have the same number of peers.
for _, count := range countOrdinaryPeers {
c.Assert(count, Equals, numRegions*3/numOrdinaryStores)
c.Assert(float64(count), LessEqual, 1.1*float64(numRegions*3)/float64(numOrdinaryStores))
c.Assert(float64(count), GreaterEqual, 0.9*float64(numRegions*3)/float64(numOrdinaryStores))
}
for _, count := range countSpecialPeers {
c.Assert(count, Equals, numRegions*3/numSpecialStores)
c.Assert(float64(count), LessEqual, 1.1*float64(numRegions*3)/float64(numSpecialStores))
c.Assert(float64(count), GreaterEqual, 0.9*float64(numRegions*3)/float64(numSpecialStores))
}
for _, count := range countOrdinaryLeaders {
c.Assert(float64(count), LessEqual, 1.1*float64(numRegions)/float64(numOrdinaryStores))
c.Assert(float64(count), GreaterEqual, 0.9*float64(numRegions)/float64(numOrdinaryStores))
}
}

Expand Down