Skip to content

Commit

Permalink
support region scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Legendout committed Aug 11, 2024
1 parent 3944c89 commit 8b1bf22
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 2 deletions.
18 changes: 17 additions & 1 deletion scheduler/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,23 @@ func (c *RaftCluster) handleStoreHeartbeat(stats *schedulerpb.StoreStats) error
// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Your Code Here (3C).

epoch := region.GetRegionEpoch()
if epoch == nil {
return errors.Errorf("region has no epoch")
}
// 1. 检查是否有两个 region 的 id 是一样的
oldRegion := c.GetRegion(region.GetID())
if oldRegion != nil {
oldEpoch := oldRegion.GetRegionEpoch()
if epoch.ConfVer < oldEpoch.ConfVer || epoch.Version < oldEpoch.Version {
return errors.Errorf("region is stale")
}
}
// region 是最新的,更新 region tree 和 store status
c.putRegion(region)
for i := range region.GetStoreIds() {
c.updateStoreStatusLocked(i)
}
return nil
}

Expand Down
67 changes: 66 additions & 1 deletion scheduler/server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package schedulers

import (
"fmt"
"sort"

"github.com/pingcap-incubator/tinykv/scheduler/server/core"
"github.com/pingcap-incubator/tinykv/scheduler/server/schedule"
"github.com/pingcap-incubator/tinykv/scheduler/server/schedule/operator"
Expand Down Expand Up @@ -75,8 +78,70 @@ func (s *balanceRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return s.opController.OperatorCount(operator.OpRegion) < cluster.GetRegionScheduleLimit()
}

type storeSlice []*core.StoreInfo

func (a storeSlice) Len() int { return len(a) }
func (a storeSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a storeSlice) Less(i, j int) bool { return a[i].GetRegionSize() < a[j].GetRegionSize() }

// Schedule 避免太多 region 堆积在一个 store
func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) *operator.Operator {
// Your Code Here (3C).

// 1. 选出所有的 suitableStores
stores := make(storeSlice, 0)
for _, store := range cluster.GetStores() {
// 适合被移动的 store 需要满足停机时间不超过 MaxStoreDownTime
if store.IsUp() && store.DownTime() < cluster.GetMaxStoreDownTime() {
stores = append(stores, store)
}
}
if len(stores) < 2 {
return nil
}
// 2. 遍历 suitableStores,找到目标 region 和 store
sort.Sort(stores)
var fromStore, toStore *core.StoreInfo
var region *core.RegionInfo
for i := len(stores) - 1; i >= 0; i-- {
var regions core.RegionsContainer
cluster.GetPendingRegionsWithLock(stores[i].GetID(), func(rc core.RegionsContainer) { regions = rc })
region = regions.RandomRegion(nil, nil)
if region != nil {
fromStore = stores[i]
break
}
cluster.GetFollowersWithLock(stores[i].GetID(), func(rc core.RegionsContainer) { regions = rc })
region = regions.RandomRegion(nil, nil)
if region != nil {
fromStore = stores[i]
break
}
cluster.GetLeadersWithLock(stores[i].GetID(), func(rc core.RegionsContainer) { regions = rc })
region = regions.RandomRegion(nil, nil)
if region != nil {
fromStore = stores[i]
break
}
}
if region == nil {
return nil
}
// 3. 判断目标 region 的 store 数量,如果小于 cluster.GetMaxReplicas 直接放弃本次操作
storeIds := region.GetStoreIds()
if len(storeIds) < cluster.GetMaxReplicas() {
return nil
}
// 4. 再次从 suitableStores 里面找到一个目标 store,目标 store 不能在原来的 region 里面
for i := 0; i < len(stores); i++ {
if _, ok := storeIds[stores[i].GetID()]; !ok {
toStore = stores[i]
break
}
}
if toStore == nil {
return nil
}
// 5. 判断两个 store 的 region size 差值是否小于 2*ApproximateSize,是的话放弃 region 移动
if fromStore.GetRegionSize()-toStore.GetRegionSize() < region.GetApproximateSize() {
return nil
}

0 comments on commit 8b1bf22

Please sign in to comment.