From fbdddff0704e8cfd3edeeba0cdd0c05b6a97db40 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 25 May 2023 16:35:40 +0800 Subject: [PATCH] config, cluster: add an option to halt the cluster scheduling (#6498) ref tikv/pd#6493 Add an option to halt the cluster scheduling. Signed-off-by: JmPotato --- errors.toml | 5 + metrics/grafana/pd.json | 109 +++++++++++++++++++- pkg/errs/errno.go | 7 +- pkg/mock/mockcluster/mockcluster.go | 8 +- pkg/schedule/config/config.go | 2 +- pkg/schedule/coordinator.go | 14 ++- pkg/schedule/core/cluster_informer.go | 2 +- pkg/schedule/diagnostic_manager.go | 2 + pkg/schedule/placement/rule_manager_test.go | 2 +- server/cluster/cluster.go | 27 ++++- server/cluster/cluster_worker.go | 8 +- server/cluster/metrics.go | 9 ++ server/config/config.go | 9 ++ server/config/persist_options.go | 19 ++-- 14 files changed, 191 insertions(+), 32 deletions(-) diff --git a/errors.toml b/errors.toml index 08637ad3ba0..44e1252ebab 100644 --- a/errors.toml +++ b/errors.toml @@ -96,6 +96,11 @@ error = ''' TiKV cluster not bootstrapped, please start TiKV first ''' +["PD:cluster:ErrSchedulingIsHalted"] +error = ''' +scheduling is halted +''' + ["PD:cluster:ErrStoreIsUp"] error = ''' store is still up, please remove store gracefully diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 10316026349..bcb21cd5c70 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -2340,6 +2340,113 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": true, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The allowance status of the scheduling.", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 41 + }, + "hiddenSeries": false, + "id": 1464, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": false, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 1, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": true, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "pd_scheduling_allowance_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{kind}}", + "metric": "pd_scheduling_allowance_status", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Scheduling Allowance Status", + "tooltip": { + "shared": true, + "sort": 1, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:533", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "$$hashKey": "object:534", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "cacheTimeout": null, "colorBackground": false, @@ -2967,7 +3074,7 @@ "format": "time_series", "intervalFactor": 2, "legendFormat": "{{event}}", - "metric": "pd_scheduler_status", + "metric": "pd_schedule_operators_count", "refId": "A", "step": 4 } diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 64bf2208c7f..0c54201965a 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -142,9 +142,10 @@ var ( // cluster errors var ( - ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped")) - ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp")) - ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID")) + ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped")) + ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp")) + ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID")) + ErrSchedulingIsHalted = errors.Normalize("scheduling is halted", errors.RFCCodeText("PD:cluster:ErrSchedulingIsHalted")) ) // versioninfo errors diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 16b931eca57..1c23301f41b 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -104,11 +104,6 @@ func (mc *Cluster) GetAllocator() id.Allocator { return mc.IDAllocator } -// IsUnsafeRecovering returns if the cluster is in unsafe recovering. -func (mc *Cluster) IsUnsafeRecovering() bool { - return false -} - // GetPersistOptions returns the persist options. func (mc *Cluster) GetPersistOptions() *config.PersistOptions { return mc.PersistOptions @@ -123,6 +118,9 @@ func (mc *Cluster) IsSchedulerExisted(name string) (bool, error) { return false, // IsSchedulerDisabled checks if the scheduler with name is disabled or not. func (mc *Cluster) IsSchedulerDisabled(name string) (bool, error) { return false, nil } +// CheckSchedulingAllowance checks if the cluster allows scheduling currently. +func (mc *Cluster) CheckSchedulingAllowance() (bool, error) { return true, nil } + // ScanRegions scans region with start key, until number greater than limit. func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo { return mc.ScanRange(startKey, endKey, limit) diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 2c0842914c7..f63aae8d2dd 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -87,7 +87,7 @@ type Config interface { SetSplitMergeInterval(time.Duration) SetMaxReplicas(int) SetPlacementRulesCacheEnabled(bool) - SetWitnessEnabled(bool) + SetEnableWitness(bool) // only for store configuration UseRaftV2() } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index d271aa36418..2171af8ee80 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -143,8 +143,7 @@ func (c *Coordinator) PatrolRegions() { log.Info("patrol regions has been stopped") return } - if c.cluster.IsUnsafeRecovering() { - // Skip patrolling regions during unsafe recovery. + if allowed, _ := c.cluster.CheckSchedulingAllowance(); !allowed { continue } @@ -577,7 +576,7 @@ func (c *Coordinator) CollectSchedulerMetrics() { var allowScheduler float64 // If the scheduler is not allowed to schedule, it will disappear in Grafana panel. // See issue #1341. - if !s.IsPaused() && !s.cluster.IsUnsafeRecovering() { + if allowed, _ := s.cluster.CheckSchedulingAllowance(); !s.IsPaused() && allowed { allowScheduler = 1 } schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler) @@ -1047,7 +1046,14 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool { } return false } - if s.IsPaused() || s.cluster.IsUnsafeRecovering() { + allowed, _ := s.cluster.CheckSchedulingAllowance() + if !allowed { + if diagnosable { + s.diagnosticRecorder.setResultFromStatus(halted) + } + return false + } + if s.IsPaused() { if diagnosable { s.diagnosticRecorder.setResultFromStatus(paused) } diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index 345cdeb74a9..1af8b28046a 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -44,8 +44,8 @@ type ClusterInformer interface { UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) IsSchedulerExisted(name string) (bool, error) IsSchedulerDisabled(name string) (bool, error) + CheckSchedulingAllowance() (bool, error) GetPersistOptions() *config.PersistOptions - IsUnsafeRecovering() bool } // RegionHealthCluster is an aggregate interface that wraps multiple interfaces diff --git a/pkg/schedule/diagnostic_manager.go b/pkg/schedule/diagnostic_manager.go index c68999f6cdd..9560110fea0 100644 --- a/pkg/schedule/diagnostic_manager.go +++ b/pkg/schedule/diagnostic_manager.go @@ -34,6 +34,8 @@ const ( disabled = "disabled" // paused means the current scheduler is paused paused = "paused" + // halted means the current scheduler is halted + halted = "halted" // scheduling means the current scheduler is generating. scheduling = "scheduling" // pending means the current scheduler cannot generate scheduling operator diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index a9c413ffb73..a6454337aa8 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -33,7 +33,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) var err error manager := NewRuleManager(store, nil, mockconfig.NewTestOptions()) - manager.conf.SetWitnessEnabled(enableWitness) + manager.conf.SetEnableWitness(enableWitness) err = manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) return store, manager diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 76071396b33..fbfd483e49d 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -828,11 +828,6 @@ func (c *RaftCluster) GetUnsafeRecoveryController() *unsaferecovery.Controller { return c.unsafeRecoveryController } -// IsUnsafeRecovering returns if the cluster is in unsafe recovering. -func (c *RaftCluster) IsUnsafeRecovering() bool { - return c.unsafeRecoveryController.IsRunning() -} - // AddSuspectKeyRange adds the key range with the its ruleID as the key // The instance of each keyRange is like following format: // [2][]byte: start key/end key @@ -2713,3 +2708,25 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) { func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) { return c.coordinator.GetPausedSchedulerDelayUntil(name) } + +var ( + onlineUnsafeRecoveryStatus = schedulingAllowanceStatusGauge.WithLabelValues("online-unsafe-recovery") + haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling") +) + +// CheckSchedulingAllowance checks if the cluster allows scheduling currently. +func (c *RaftCluster) CheckSchedulingAllowance() (bool, error) { + // If the cluster is in the process of online unsafe recovery, it should not allow scheduling. + if c.GetUnsafeRecoveryController().IsRunning() { + onlineUnsafeRecoveryStatus.Set(1) + return false, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() + } + onlineUnsafeRecoveryStatus.Set(0) + // If the halt-scheduling is set, it should not allow scheduling. + if c.opt.IsSchedulingHalted() { + haltSchedulingStatus.Set(1) + return false, errs.ErrSchedulingIsHalted.FastGenByArgs() + } + haltSchedulingStatus.Set(0) + return true, nil +} diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 06f5c1725aa..9f87b4501ad 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -43,8 +43,8 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { // HandleAskSplit handles the split request. func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) { - if c.GetUnsafeRecoveryController().IsRunning() { - return nil, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() + if allowed, err := c.CheckSchedulingAllowance(); !allowed { + return nil, err } if !c.opt.IsTikvRegionSplitEnabled() { return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() @@ -105,8 +105,8 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error { // HandleAskBatchSplit handles the batch split request. func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { - if c.GetUnsafeRecoveryController().IsRunning() { - return nil, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() + if allowed, err := c.CheckSchedulingAllowance(); !allowed { + return nil, err } if !c.opt.IsTikvRegionSplitEnabled() { return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() diff --git a/server/cluster/metrics.go b/server/cluster/metrics.go index e43fe595f70..4306779d681 100644 --- a/server/cluster/metrics.go +++ b/server/cluster/metrics.go @@ -95,11 +95,20 @@ var ( Name: "store_sync", Help: "The state of store sync config", }, []string{"address", "state"}) + + schedulingAllowanceStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "scheduling", + Name: "allowance_status", + Help: "Status of the scheduling allowance.", + }, []string{"kind"}) ) func init() { prometheus.MustRegister(regionEventCounter) prometheus.MustRegister(healthStatusGauge) + prometheus.MustRegister(schedulingAllowanceStatusGauge) prometheus.MustRegister(clusterStateCPUGauge) prometheus.MustRegister(clusterStateCurrent) prometheus.MustRegister(bucketEventCounter) diff --git a/server/config/config.go b/server/config/config.go index 0b67087a121..349f5fbeb7e 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -222,6 +222,7 @@ const ( defaultEnableGRPCGateway = true defaultDisableErrorVerbose = true defaultEnableWitness = false + defaultHaltScheduling = false defaultDashboardAddress = "auto" @@ -700,6 +701,10 @@ type ScheduleConfig struct { // v1: which is based on the region count by rate limit. // v2: which is based on region size by window size. StoreLimitVersion string `toml:"store-limit-version" json:"store-limit-version,omitempty"` + + // HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling, + // and any other scheduling configs will be ignored. + HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"` } // Clone returns a cloned scheduling configuration. @@ -836,6 +841,10 @@ func (c *ScheduleConfig) adjust(meta *configutil.ConfigMetaData, reloading bool) configutil.AdjustString(&c.RegionScoreFormulaVersion, defaultRegionScoreFormulaVersion) } + if !meta.IsDefined("halt-scheduling") { + c.HaltScheduling = defaultHaltScheduling + } + adjustSchedulers(&c.Schedulers, DefaultSchedulers) for k, b := range c.migrateConfigurationMap() { diff --git a/server/config/persist_options.go b/server/config/persist_options.go index cf3646247be..3539f251734 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -184,13 +184,6 @@ func (o *PersistOptions) SetPlacementRulesCacheEnabled(enabled bool) { o.SetReplicationConfig(v) } -// SetWitnessEnabled set EanbleWitness -func (o *PersistOptions) SetWitnessEnabled(enabled bool) { - v := o.GetScheduleConfig().Clone() - v.EnableWitness = enabled - o.SetScheduleConfig(v) -} - // GetStrictlyMatchLabel returns whether check label strict. func (o *PersistOptions) GetStrictlyMatchLabel() bool { return o.GetReplicationConfig().StrictlyMatchLabel @@ -945,3 +938,15 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien } return err } + +// SetHaltScheduling set HaltScheduling. +func (o *PersistOptions) SetHaltScheduling(halt bool) { + v := o.GetScheduleConfig().Clone() + v.HaltScheduling = halt + o.SetScheduleConfig(v) +} + +// IsSchedulingHalted returns if PD scheduling is halted. +func (o *PersistOptions) IsSchedulingHalted() bool { + return o.GetScheduleConfig().HaltScheduling +}