Skip to content

Commit

Permalink
config, cluster: add an option to halt the cluster scheduling (#6498) (
Browse files Browse the repository at this point in the history
…#6558)

ref #6493, ref #6498

Add an option to halt the cluster scheduling.

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
ti-chi-bot and HuSharp authored Jun 6, 2023
1 parent 8e9d0c4 commit 410b176
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 12 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ error = '''
TiKV cluster not bootstrapped, please start TiKV first
'''

["PD:cluster:ErrSchedulingIsHalted"]
error = '''
scheduling is halted
'''

["PD:cluster:ErrStoreIsUp"]
error = '''
store is still up, please remove store gracefully
Expand Down
109 changes: 108 additions & 1 deletion metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -2332,6 +2332,113 @@
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": true,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The allowance status of the scheduling.",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 0,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 41
},
"hiddenSeries": false,
"id": 1464,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": false,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pluginVersion": "7.5.10",
"pointradius": 1,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": true,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "pd_scheduling_allowance_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{kind}}",
"metric": "pd_scheduling_allowance_status",
"refId": "A",
"step": 2
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Scheduling Allowance Status",
"tooltip": {
"shared": true,
"sort": 1,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:533",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"$$hashKey": "object:534",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"cacheTimeout": null,
"colorBackground": false,
Expand Down Expand Up @@ -2959,7 +3066,7 @@
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{event}}",
"metric": "pd_scheduler_status",
"metric": "pd_schedule_operators_count",
"refId": "A",
"step": 4
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ var (

// cluster errors
var (
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp"))
ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID"))
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp"))
ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID"))
ErrSchedulingIsHalted = errors.Normalize("scheduling is halted", errors.RFCCodeText("PD:cluster:ErrSchedulingIsHalted"))
)

// versioninfo errors
Expand Down
3 changes: 3 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
}

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (mc *Cluster) CheckSchedulingAllowance() (bool, error) { return true, nil }

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
Expand Down
22 changes: 22 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2531,3 +2531,25 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) {
func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) {
return c.coordinator.getPausedSchedulerDelayUntil(name)
}

var (
onlineUnsafeRecoveryStatus = schedulingAllowanceStatusGauge.WithLabelValues("online-unsafe-recovery")
haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")
)

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (c *RaftCluster) CheckSchedulingAllowance() (bool, error) {
// If the cluster is in the process of online unsafe recovery, it should not allow scheduling.
if c.GetUnsafeRecoveryController().IsRunning() {
onlineUnsafeRecoveryStatus.Set(1)
return false, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}
onlineUnsafeRecoveryStatus.Set(0)
// If the halt-scheduling is set, it should not allow scheduling.
if c.opt.IsSchedulingHalted() {
haltSchedulingStatus.Set(1)
return false, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
haltSchedulingStatus.Set(0)
return true, nil
}
8 changes: 4 additions & 4 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// HandleAskSplit handles the split request.
func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
if c.GetUnsafeRecoveryController().IsRunning() {
return nil, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
if allowed, err := c.CheckSchedulingAllowance(); !allowed {
return nil, err
}
if !c.opt.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
Expand Down Expand Up @@ -105,8 +105,8 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {

// HandleAskBatchSplit handles the batch split request.
func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if c.GetUnsafeRecoveryController().IsRunning() {
return nil, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
if allowed, err := c.CheckSchedulingAllowance(); !allowed {
return nil, err
}
if !c.opt.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
Expand Down
14 changes: 10 additions & 4 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ func (c *coordinator) patrolRegions() {
log.Info("patrol regions has been stopped")
return
}
if c.cluster.GetUnsafeRecoveryController().IsRunning() {
// Skip patrolling regions during unsafe recovery.
if allowed, _ := c.cluster.CheckSchedulingAllowance(); !allowed {
continue
}

Expand Down Expand Up @@ -533,7 +532,7 @@ func (c *coordinator) collectSchedulerMetrics() {
var allowScheduler float64
// If the scheduler is not allowed to schedule, it will disappear in Grafana panel.
// See issue #1341.
if !s.IsPaused() && !s.cluster.GetUnsafeRecoveryController().IsRunning() {
if allowed, _ := s.cluster.CheckSchedulingAllowance(); !s.IsPaused() && allowed {
allowScheduler = 1
}
schedulerStatusGauge.WithLabelValues(s.GetName(), "allow").Set(allowScheduler)
Expand Down Expand Up @@ -939,7 +938,14 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool {
}
return false
}
if s.IsPaused() || s.cluster.GetUnsafeRecoveryController().IsRunning() {
allowed, _ := s.cluster.CheckSchedulingAllowance()
if !allowed {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(halted)
}
return false
}
if s.IsPaused() {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(paused)
}
Expand Down
2 changes: 2 additions & 0 deletions server/cluster/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
disabled = "disabled"
// paused means the current scheduler is paused
paused = "paused"
// halted means the current scheduler is halted
halted = "halted"
// scheduling means the current scheduler is generating.
scheduling = "scheduling"
// pending means the current scheduler cannot generate scheduling operator
Expand Down
9 changes: 9 additions & 0 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ var (
Name: "store_sync",
Help: "The state of store sync config",
}, []string{"address", "state"})

schedulingAllowanceStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduling",
Name: "allowance_status",
Help: "Status of the scheduling allowance.",
}, []string{"kind"})
)

func init() {
Expand All @@ -143,6 +151,7 @@ func init() {
prometheus.MustRegister(schedulerStatusGauge)
prometheus.MustRegister(hotSpotStatusGauge)
prometheus.MustRegister(patrolCheckRegionsGauge)
prometheus.MustRegister(schedulingAllowanceStatusGauge)
prometheus.MustRegister(clusterStateCPUGauge)
prometheus.MustRegister(clusterStateCurrent)
prometheus.MustRegister(regionListGauge)
Expand Down
9 changes: 9 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ const (
defaultEnableGRPCGateway = true
defaultDisableErrorVerbose = true
defaultEnableWitness = false
defaultHaltScheduling = false

defaultDashboardAddress = "auto"

Expand Down Expand Up @@ -772,6 +773,10 @@ type ScheduleConfig struct {

// EnableWitness is the option to enable using witness
EnableWitness bool `toml:"enable-witness" json:"enable-witness,string"`

// HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling,
// and any other scheduling configs will be ignored.
HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"`
}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -895,6 +900,10 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
adjustString(&c.RegionScoreFormulaVersion, defaultRegionScoreFormulaVersion)
}

if !meta.IsDefined("halt-scheduling") {
c.HaltScheduling = defaultHaltScheduling
}

adjustSchedulers(&c.Schedulers, DefaultSchedulers)

for k, b := range c.migrateConfigurationMap() {
Expand Down
12 changes: 12 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,3 +856,15 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien
}
return err
}

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
}

// IsSchedulingHalted returns if PD scheduling is halted.
func (o *PersistOptions) IsSchedulingHalted() bool {
return o.GetScheduleConfig().HaltScheduling
}

0 comments on commit 410b176

Please sign in to comment.