Skip to content

Commit

Permalink
config, cluster: add an option to halt the cluster scheduling (tikv#6498
Browse files Browse the repository at this point in the history
)

ref tikv#6493

Add an option to halt the cluster scheduling.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored and rleungx committed Nov 30, 2023
1 parent 8582fbd commit fbdddff
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 32 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ error = '''
TiKV cluster not bootstrapped, please start TiKV first
'''

["PD:cluster:ErrSchedulingIsHalted"]
error = '''
scheduling is halted
'''

["PD:cluster:ErrStoreIsUp"]
error = '''
store is still up, please remove store gracefully
Expand Down
109 changes: 108 additions & 1 deletion metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -2340,6 +2340,113 @@
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": true,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The allowance status of the scheduling.",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 0,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 41
},
"hiddenSeries": false,
"id": 1464,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": false,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pluginVersion": "7.5.10",
"pointradius": 1,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": true,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "pd_scheduling_allowance_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{kind}}",
"metric": "pd_scheduling_allowance_status",
"refId": "A",
"step": 2
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Scheduling Allowance Status",
"tooltip": {
"shared": true,
"sort": 1,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:533",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"$$hashKey": "object:534",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"cacheTimeout": null,
"colorBackground": false,
Expand Down Expand Up @@ -2967,7 +3074,7 @@
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{event}}",
"metric": "pd_scheduler_status",
"metric": "pd_schedule_operators_count",
"refId": "A",
"step": 4
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ var (

// cluster errors
var (
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp"))
ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID"))
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp"))
ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID"))
ErrSchedulingIsHalted = errors.Normalize("scheduling is halted", errors.RFCCodeText("PD:cluster:ErrSchedulingIsHalted"))
)

// versioninfo errors
Expand Down
8 changes: 3 additions & 5 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,6 @@ func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
}

// IsUnsafeRecovering returns if the cluster is in unsafe recovering.
func (mc *Cluster) IsUnsafeRecovering() bool {
return false
}

// GetPersistOptions returns the persist options.
func (mc *Cluster) GetPersistOptions() *config.PersistOptions {
return mc.PersistOptions
Expand All @@ -123,6 +118,9 @@ func (mc *Cluster) IsSchedulerExisted(name string) (bool, error) { return false,
// IsSchedulerDisabled checks if the scheduler with name is disabled or not.
func (mc *Cluster) IsSchedulerDisabled(name string) (bool, error) { return false, nil }

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (mc *Cluster) CheckSchedulingAllowance() (bool, error) { return true, nil }

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type Config interface {
SetSplitMergeInterval(time.Duration)
SetMaxReplicas(int)
SetPlacementRulesCacheEnabled(bool)
SetWitnessEnabled(bool)
SetEnableWitness(bool)
// only for store configuration
UseRaftV2()
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ func (c *Coordinator) PatrolRegions() {
log.Info("patrol regions has been stopped")
return
}
if c.cluster.IsUnsafeRecovering() {
// Skip patrolling regions during unsafe recovery.
if allowed, _ := c.cluster.CheckSchedulingAllowance(); !allowed {
continue
}

Expand Down Expand Up @@ -577,7 +576,7 @@ func (c *Coordinator) CollectSchedulerMetrics() {
var allowScheduler float64
// If the scheduler is not allowed to schedule, it will disappear in Grafana panel.
// See issue #1341.
if !s.IsPaused() && !s.cluster.IsUnsafeRecovering() {
if allowed, _ := s.cluster.CheckSchedulingAllowance(); !s.IsPaused() && allowed {
allowScheduler = 1
}
schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler)
Expand Down Expand Up @@ -1047,7 +1046,14 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool {
}
return false
}
if s.IsPaused() || s.cluster.IsUnsafeRecovering() {
allowed, _ := s.cluster.CheckSchedulingAllowance()
if !allowed {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(halted)
}
return false
}
if s.IsPaused() {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(paused)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type ClusterInformer interface {
UpdateRegionsLabelLevelStats(regions []*core.RegionInfo)
IsSchedulerExisted(name string) (bool, error)
IsSchedulerDisabled(name string) (bool, error)
CheckSchedulingAllowance() (bool, error)
GetPersistOptions() *config.PersistOptions
IsUnsafeRecovering() bool
}

// RegionHealthCluster is an aggregate interface that wraps multiple interfaces
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
disabled = "disabled"
// paused means the current scheduler is paused
paused = "paused"
// halted means the current scheduler is halted
halted = "halted"
// scheduling means the current scheduler is generating.
scheduling = "scheduling"
// pending means the current scheduler cannot generate scheduling operator
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/placement/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
var err error
manager := NewRuleManager(store, nil, mockconfig.NewTestOptions())
manager.conf.SetWitnessEnabled(enableWitness)
manager.conf.SetEnableWitness(enableWitness)
err = manager.Initialize(3, []string{"zone", "rack", "host"}, "")
re.NoError(err)
return store, manager
Expand Down
27 changes: 22 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,11 +828,6 @@ func (c *RaftCluster) GetUnsafeRecoveryController() *unsaferecovery.Controller {
return c.unsafeRecoveryController
}

// IsUnsafeRecovering returns if the cluster is in unsafe recovering.
func (c *RaftCluster) IsUnsafeRecovering() bool {
return c.unsafeRecoveryController.IsRunning()
}

// AddSuspectKeyRange adds the key range with the its ruleID as the key
// The instance of each keyRange is like following format:
// [2][]byte: start key/end key
Expand Down Expand Up @@ -2713,3 +2708,25 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) {
func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) {
return c.coordinator.GetPausedSchedulerDelayUntil(name)
}

var (
onlineUnsafeRecoveryStatus = schedulingAllowanceStatusGauge.WithLabelValues("online-unsafe-recovery")
haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")
)

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (c *RaftCluster) CheckSchedulingAllowance() (bool, error) {
// If the cluster is in the process of online unsafe recovery, it should not allow scheduling.
if c.GetUnsafeRecoveryController().IsRunning() {
onlineUnsafeRecoveryStatus.Set(1)
return false, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}
onlineUnsafeRecoveryStatus.Set(0)
// If the halt-scheduling is set, it should not allow scheduling.
if c.opt.IsSchedulingHalted() {
haltSchedulingStatus.Set(1)
return false, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
haltSchedulingStatus.Set(0)
return true, nil
}
8 changes: 4 additions & 4 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// HandleAskSplit handles the split request.
func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
if c.GetUnsafeRecoveryController().IsRunning() {
return nil, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
if allowed, err := c.CheckSchedulingAllowance(); !allowed {
return nil, err
}
if !c.opt.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
Expand Down Expand Up @@ -105,8 +105,8 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {

// HandleAskBatchSplit handles the batch split request.
func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if c.GetUnsafeRecoveryController().IsRunning() {
return nil, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
if allowed, err := c.CheckSchedulingAllowance(); !allowed {
return nil, err
}
if !c.opt.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
Expand Down
9 changes: 9 additions & 0 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,20 @@ var (
Name: "store_sync",
Help: "The state of store sync config",
}, []string{"address", "state"})

schedulingAllowanceStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduling",
Name: "allowance_status",
Help: "Status of the scheduling allowance.",
}, []string{"kind"})
)

func init() {
prometheus.MustRegister(regionEventCounter)
prometheus.MustRegister(healthStatusGauge)
prometheus.MustRegister(schedulingAllowanceStatusGauge)
prometheus.MustRegister(clusterStateCPUGauge)
prometheus.MustRegister(clusterStateCurrent)
prometheus.MustRegister(bucketEventCounter)
Expand Down
9 changes: 9 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ const (
defaultEnableGRPCGateway = true
defaultDisableErrorVerbose = true
defaultEnableWitness = false
defaultHaltScheduling = false

defaultDashboardAddress = "auto"

Expand Down Expand Up @@ -700,6 +701,10 @@ type ScheduleConfig struct {
// v1: which is based on the region count by rate limit.
// v2: which is based on region size by window size.
StoreLimitVersion string `toml:"store-limit-version" json:"store-limit-version,omitempty"`

// HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling,
// and any other scheduling configs will be ignored.
HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"`
}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -836,6 +841,10 @@ func (c *ScheduleConfig) adjust(meta *configutil.ConfigMetaData, reloading bool)
configutil.AdjustString(&c.RegionScoreFormulaVersion, defaultRegionScoreFormulaVersion)
}

if !meta.IsDefined("halt-scheduling") {
c.HaltScheduling = defaultHaltScheduling
}

adjustSchedulers(&c.Schedulers, DefaultSchedulers)

for k, b := range c.migrateConfigurationMap() {
Expand Down
19 changes: 12 additions & 7 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,6 @@ func (o *PersistOptions) SetPlacementRulesCacheEnabled(enabled bool) {
o.SetReplicationConfig(v)
}

// SetWitnessEnabled set EanbleWitness
func (o *PersistOptions) SetWitnessEnabled(enabled bool) {
v := o.GetScheduleConfig().Clone()
v.EnableWitness = enabled
o.SetScheduleConfig(v)
}

// GetStrictlyMatchLabel returns whether check label strict.
func (o *PersistOptions) GetStrictlyMatchLabel() bool {
return o.GetReplicationConfig().StrictlyMatchLabel
Expand Down Expand Up @@ -945,3 +938,15 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien
}
return err
}

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
}

// IsSchedulingHalted returns if PD scheduling is halted.
func (o *PersistOptions) IsSchedulingHalted() bool {
return o.GetScheduleConfig().HaltScheduling
}

0 comments on commit fbdddff

Please sign in to comment.