Skip to content

Commit

Permalink
*: remove unnecessary interfaces (tikv#6551)
Browse files Browse the repository at this point in the history
ref tikv#5839

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Nov 30, 2023
1 parent fbdddff commit a82d63f
Show file tree
Hide file tree
Showing 40 changed files with 251 additions and 287 deletions.
2 changes: 1 addition & 1 deletion pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, key

// RegionSetInformer provides access to a shared informer of regions.
type RegionSetInformer interface {
GetRegionCount() int
GetTotalRegionCount() int
RandFollowerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandLeaderRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandLearnerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
Expand Down
16 changes: 8 additions & 8 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,8 +1287,8 @@ func (r *RegionsInfo) GetStoreStats(storeID uint64) (leader, region, witness, le
r.learners[storeID].length(), r.pendingPeers[storeID].length(), r.leaders[storeID].TotalSize(), r.getStoreRegionSizeLocked(storeID)
}

// GetRegionCount gets the total count of RegionInfo of regionMap
func (r *RegionsInfo) GetRegionCount() int {
// GetTotalRegionCount gets the total count of RegionInfo of regionMap
func (r *RegionsInfo) GetTotalRegionCount() int {
r.t.RLock()
defer r.t.RUnlock()
return len(r.regions)
Expand Down Expand Up @@ -1482,8 +1482,8 @@ func (r *RegionInfo) GetWriteLoads() []float64 {
}
}

// GetRangeCount returns the number of regions that overlap with the range [startKey, endKey).
func (r *RegionsInfo) GetRangeCount(startKey, endKey []byte) int {
// GetRegionCount returns the number of regions that overlap with the range [startKey, endKey).
func (r *RegionsInfo) GetRegionCount(startKey, endKey []byte) int {
r.t.RLock()
defer r.t.RUnlock()
start := &regionItem{&RegionInfo{meta: &metapb.Region{StartKey: startKey}}}
Expand All @@ -1505,9 +1505,9 @@ func (r *RegionsInfo) GetRangeCount(startKey, endKey []byte) int {
return endIndex - startIndex + 1
}

// ScanRange scans regions intersecting [start key, end key), returns at most
// ScanRegions scans regions intersecting [start key, end key), returns at most
// `limit` regions. limit <= 0 means no limit.
func (r *RegionsInfo) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo {
func (r *RegionsInfo) ScanRegions(startKey, endKey []byte, limit int) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
var res []*RegionInfo
Expand All @@ -1524,9 +1524,9 @@ func (r *RegionsInfo) ScanRange(startKey, endKey []byte, limit int) []*RegionInf
return res
}

// ScanRangeWithIterator scans from the first region containing or behind start key,
// ScanRegionWithIterator scans from the first region containing or behind start key,
// until iterator returns false.
func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(region *RegionInfo) bool) {
func (r *RegionsInfo) ScanRegionWithIterator(startKey []byte, iterator func(region *RegionInfo) bool) {
r.t.RLock()
defer r.t.RUnlock()
r.tree.scanRange(startKey, iterator)
Expand Down
2 changes: 1 addition & 1 deletion pkg/dashboard/keyvisual/input/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func clusterScan(rc *core.BasicCluster) RegionsInfo {
regions := make([]*core.RegionInfo, 0, limit)

for {
rs := rc.ScanRange(startKey, endKey, limit)
rs := rc.ScanRegions(startKey, endKey, limit)
length := len(rs)
if length == 0 {
break
Expand Down
19 changes: 0 additions & 19 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,9 @@ func (mc *Cluster) GetPersistOptions() *config.PersistOptions {
// UpdateRegionsLabelLevelStats updates the label level stats for the regions.
func (mc *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}

// IsSchedulerExisted checks if the scheduler with name is existed or not.
func (mc *Cluster) IsSchedulerExisted(name string) (bool, error) { return false, nil }

// IsSchedulerDisabled checks if the scheduler with name is disabled or not.
func (mc *Cluster) IsSchedulerDisabled(name string) (bool, error) { return false, nil }

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (mc *Cluster) CheckSchedulingAllowance() (bool, error) { return true, nil }

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
}

// LoadRegion puts region info without leader
func (mc *Cluster) LoadRegion(regionID uint64, peerStoreIDs ...uint64) {
// regions load from etcd will have no leader
Expand Down Expand Up @@ -817,11 +806,6 @@ func (mc *Cluster) PutStoreWithLabels(id uint64, labelPairs ...string) {
mc.AddLabelsStore(id, 0, labels)
}

// RemoveScheduler mocks method.
func (mc *Cluster) RemoveScheduler(name string) error {
return nil
}

// MockRegionInfo returns a mock region
// If leaderStoreID is zero, the regions would have no leader
func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64,
Expand Down Expand Up @@ -950,6 +934,3 @@ func (mc *Cluster) ObserveRegionsStats() {
storeIDs, writeBytesRates, writeKeysRates := mc.BasicCluster.GetStoresWriteRate()
mc.HotStat.ObserveRegionsStats(storeIDs, writeBytesRates, writeKeysRates)
}

// RecordOpStepWithTTL records OpStep with TTL
func (mc *Cluster) RecordOpStepWithTTL(regionID uint64) {}
2 changes: 1 addition & 1 deletion pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func (m *ModeManager) updateProgress() {
key = r.GetEndKey()
}
m.drSampleTotalRegion = len(sampleRegions)
m.drTotalRegion = m.cluster.GetRegionCount()
m.drTotalRegion = m.cluster.GetTotalRegionCount()
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() {

mc := NewMergeChecker(suite.ctx, tc, tc.GetOpts())
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */)
oc := operator.NewController(suite.ctx, tc, stream)
oc := operator.NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream)

regions[2] = regions[2].Clone(
core.SetPeers([]*metapb.Peer{
Expand Down
46 changes: 26 additions & 20 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,23 @@ type Coordinator struct {
// NewCoordinator creates a new Coordinator.
func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := operator.NewController(ctx, cluster, hbStreams)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetPersistOptions(), hbStreams)
schedulers := make(map[string]*scheduleController)
return &Coordinator{
ctx: ctx,
cancel: cancel,
cluster: cluster,
prepareChecker: newPrepareChecker(),
checkers: checker.NewController(ctx, cluster, cluster.GetOpts(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController),
regionScatterer: NewRegionScatterer(ctx, cluster, opController),
regionSplitter: NewRegionSplitter(cluster, NewSplitRegionsHandler(cluster, opController)),
schedulers: schedulers,
opController: opController,
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
diagnosticManager: newDiagnosticManager(cluster),
}
c := &Coordinator{
ctx: ctx,
cancel: cancel,
cluster: cluster,
prepareChecker: newPrepareChecker(),
checkers: checker.NewController(ctx, cluster, cluster.GetOpts(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController),
regionScatterer: NewRegionScatterer(ctx, cluster, opController),
regionSplitter: NewRegionSplitter(cluster, NewSplitRegionsHandler(cluster, opController)),
schedulers: schedulers,
opController: opController,
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
}
c.diagnosticManager = newDiagnosticManager(c, cluster.GetPersistOptions())
return c
}

// GetWaitingRegions returns the regions in the waiting list.
Expand Down Expand Up @@ -305,7 +306,7 @@ func (c *Coordinator) drivePushOperator() {
log.Info("drive push operator has been stopped")
return
case <-ticker.C:
c.opController.PushOperators()
c.opController.PushOperators(c.RecordOpStepWithTTL)
}
}
}
Expand Down Expand Up @@ -382,7 +383,7 @@ func (c *Coordinator) Run() {
log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args))
continue
}
s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)))
s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
continue
Expand All @@ -403,7 +404,7 @@ func (c *Coordinator) Run() {
continue
}

s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args))
s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args), c.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
continue
Expand Down Expand Up @@ -452,7 +453,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
}
schedulerArgs := SchedulerArgs.(func() []string)
// create and add user scheduler
s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs()))
s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs()), c.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), errs.ZapError(err))
return
Expand Down Expand Up @@ -724,7 +725,7 @@ func (c *Coordinator) removeOptScheduler(o *config.PersistOptions, name string)
for i, schedulerCfg := range v.Schedulers {
// To create a temporary scheduler is just used to get scheduler's name
decoder := schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args)
tmp, err := schedulers.CreateScheduler(schedulerCfg.Type, operator.NewController(c.ctx, nil, nil), storage.NewStorageWithMemoryBackend(), decoder)
tmp, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, storage.NewStorageWithMemoryBackend(), decoder, c.RemoveScheduler)
if err != nil {
return err
}
Expand Down Expand Up @@ -945,6 +946,11 @@ func (c *Coordinator) GetDiagnosticResult(name string) (*DiagnosticResult, error
return c.diagnosticManager.getDiagnosticResult(name)
}

// RecordOpStepWithTTL records OpStep with TTL
func (c *Coordinator) RecordOpStepWithTTL(regionID uint64) {
c.GetRuleChecker().RecordRegionPromoteToNonWitness(regionID)
}

// scheduleController is used to manage a scheduler to schedulers.
type scheduleController struct {
schedulers.Scheduler
Expand Down
17 changes: 9 additions & 8 deletions pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,23 @@ type ClusterInformer interface {
GetAllocator() id.Allocator
GetRegionLabeler() *labeler.RegionLabeler
GetStorage() storage.Storage
RemoveScheduler(name string) error
AddSuspectRegions(ids ...uint64)
RecordOpStepWithTTL(regionID uint64)
UpdateRegionsLabelLevelStats(regions []*core.RegionInfo)
IsSchedulerExisted(name string) (bool, error)
IsSchedulerDisabled(name string) (bool, error)
CheckSchedulingAllowance() (bool, error)
AddSuspectRegions(ids ...uint64)
GetPersistOptions() *config.PersistOptions
}

// RegionHealthCluster is an aggregate interface that wraps multiple interfaces
type RegionHealthCluster interface {
core.StoreSetInformer
core.StoreSetController
core.RegionSetInformer
BasicCluster

GetOpts() sc.Config
GetRuleManager() *placement.RuleManager
}

// BasicCluster is an aggregate interface that wraps multiple interfaces
type BasicCluster interface {
core.StoreSetInformer
core.StoreSetController
core.RegionSetInformer
}
32 changes: 18 additions & 14 deletions pkg/schedule/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/movingaverage"
sche "github.com/tikv/pd/pkg/schedule/core"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/schedulers"
Expand Down Expand Up @@ -56,28 +56,30 @@ var DiagnosableSummaryFunc = map[string]plan.Summary{
}

type diagnosticManager struct {
cluster sche.ClusterInformer
recorders map[string]*diagnosticRecorder
coordinator *Coordinator
config sc.Config
recorders map[string]*diagnosticRecorder
}

func newDiagnosticManager(cluster sche.ClusterInformer) *diagnosticManager {
func newDiagnosticManager(coordinator *Coordinator, config sc.Config) *diagnosticManager {
recorders := make(map[string]*diagnosticRecorder)
for name := range DiagnosableSummaryFunc {
recorders[name] = newDiagnosticRecorder(name, cluster)
recorders[name] = newDiagnosticRecorder(name, coordinator, config)
}
return &diagnosticManager{
cluster: cluster,
recorders: recorders,
coordinator: coordinator,
config: config,
recorders: recorders,
}
}

func (d *diagnosticManager) getDiagnosticResult(name string) (*DiagnosticResult, error) {
if !d.cluster.GetOpts().IsDiagnosticAllowed() {
if !d.config.IsDiagnosticAllowed() {
return nil, errs.ErrDiagnosticDisabled
}

isSchedulerExisted, _ := d.cluster.IsSchedulerExisted(name)
isDisabled, _ := d.cluster.IsSchedulerDisabled(name)
isSchedulerExisted, _ := d.coordinator.IsSchedulerExisted(name)
isDisabled, _ := d.coordinator.IsSchedulerDisabled(name)
if !isSchedulerExisted || isDisabled {
ts := uint64(time.Now().Unix())
res := &DiagnosticResult{Name: name, Timestamp: ts, Status: disabled}
Expand All @@ -102,20 +104,22 @@ func (d *diagnosticManager) getRecorder(name string) *diagnosticRecorder {
// diagnosticRecorder is used to manage diagnostic for one scheduler.
type diagnosticRecorder struct {
schedulerName string
cluster sche.ClusterInformer
coordinator *Coordinator
config sc.Config
summaryFunc plan.Summary
results *cache.FIFO
}

func newDiagnosticRecorder(name string, cluster sche.ClusterInformer) *diagnosticRecorder {
func newDiagnosticRecorder(name string, coordinator *Coordinator, config sc.Config) *diagnosticRecorder {
summaryFunc, ok := DiagnosableSummaryFunc[name]
if !ok {
log.Error("can't find summary function", zap.String("scheduler-name", name))
return nil
}
return &diagnosticRecorder{
cluster: cluster,
coordinator: coordinator,
schedulerName: name,
config: config,
summaryFunc: summaryFunc,
results: cache.NewFIFO(maxDiagnosticResultNum),
}
Expand All @@ -125,7 +129,7 @@ func (d *diagnosticRecorder) isAllowed() bool {
if d == nil {
return false
}
return d.cluster.GetOpts().IsDiagnosticAllowed()
return d.config.IsDiagnosticAllowed()
}

func (d *diagnosticRecorder) getLastResult() *DiagnosticResult {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/filter/region_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (f *regionEmptyFilter) Select(region *core.RegionInfo) *plan.Status {

// isEmptyRegionAllowBalance returns true if the region is not empty or the number of regions is too small.
func isEmptyRegionAllowBalance(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool {
return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetRegionCount() < core.InitClusterRegionThreshold
return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetTotalRegionCount() < core.InitClusterRegionThreshold
}

type regionWitnessFilter struct {
Expand Down
Loading

0 comments on commit a82d63f

Please sign in to comment.