From 4af171fc9eb15d6f545a4b1942aae50b63fa99a9 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 8 Jun 2023 20:05:07 +0800 Subject: [PATCH] cluster, schedule: unify the scheduling halt to decouple dependencies (#6569) ref tikv/pd#5839 Unify the scheduling halt to decouple dependencies between `cluster` and `coordinator`. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/coordinator.go | 15 +++++++--- pkg/schedule/core/cluster_informer.go | 1 - .../unsafe_recovery_controller.go | 17 +++++++---- server/cluster/cluster.go | 22 -------------- server/cluster/cluster_worker.go | 12 +++++--- server/cluster/metrics.go | 9 ------ server/config/metrics.go | 29 +++++++++++++++++++ server/config/persist_options.go | 14 ++++++++- 8 files changed, 73 insertions(+), 46 deletions(-) create mode 100644 server/config/metrics.go diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index dd11b9a3e37..55d565a11bc 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -146,7 +146,7 @@ func (c *Coordinator) PatrolRegions() { log.Info("patrol regions has been stopped") return } - if allowed, _ := c.cluster.CheckSchedulingAllowance(); !allowed { + if c.isSchedulingHalted() { continue } @@ -173,6 +173,10 @@ func (c *Coordinator) PatrolRegions() { } } +func (c *Coordinator) isSchedulingHalted() bool { + return c.cluster.GetPersistOptions().IsSchedulingHalted() +} + func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit) if len(regions) == 0 { @@ -579,7 +583,7 @@ func (c *Coordinator) CollectSchedulerMetrics() { var allowScheduler float64 // If the scheduler is not allowed to schedule, it will disappear in Grafana panel. // See issue #1341. - if allowed, _ := s.cluster.CheckSchedulingAllowance(); !s.IsPaused() && allowed { + if !s.IsPaused() && !c.isSchedulingHalted() { allowScheduler = 1 } schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler) @@ -1054,8 +1058,7 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool { } return false } - allowed, _ := s.cluster.CheckSchedulingAllowance() - if !allowed { + if s.isSchedulingHalted() { if diagnosable { s.diagnosticRecorder.setResultFromStatus(halted) } @@ -1070,6 +1073,10 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool { return true } +func (s *scheduleController) isSchedulingHalted() bool { + return s.cluster.GetPersistOptions().IsSchedulingHalted() +} + // isPaused returns if a scheduler is paused. func (s *scheduleController) IsPaused() bool { delayUntil := atomic.LoadInt64(&s.delayUntil) diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index bcf440a587d..72e124c501e 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -39,7 +39,6 @@ type ClusterInformer interface { GetRegionLabeler() *labeler.RegionLabeler GetStorage() storage.Storage UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) - CheckSchedulingAllowance() (bool, error) AddSuspectRegions(ids ...uint64) GetPersistOptions() *config.PersistOptions } diff --git a/pkg/unsaferecovery/unsafe_recovery_controller.go b/pkg/unsaferecovery/unsafe_recovery_controller.go index c93bbf09a35..18cc760c614 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/tikv/pd/server/config" "go.uber.org/zap" ) @@ -109,6 +110,7 @@ type cluster interface { DropCacheAllRegion() GetAllocator() id.Allocator BuryStore(storeID uint64, forceBury bool) error + GetPersistOptions() *config.PersistOptions } // Controller is used to control the unsafe recovery process. @@ -174,11 +176,11 @@ func (u *Controller) reset() { func (u *Controller) IsRunning() bool { u.RLock() defer u.RUnlock() - return u.isRunningLocked() + return isRunning(u.stage) } -func (u *Controller) isRunningLocked() bool { - return u.stage != Idle && u.stage != Finished && u.stage != Failed +func isRunning(s stage) bool { + return s != Idle && s != Finished && s != Failed } // RemoveFailedStores removes Failed stores from the cluster. @@ -186,7 +188,7 @@ func (u *Controller) RemoveFailedStores(failedStores map[uint64]struct{}, timeou u.Lock() defer u.Unlock() - if u.isRunningLocked() { + if isRunning(u.stage) { return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() } @@ -316,7 +318,7 @@ func (u *Controller) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest, u.Lock() defer u.Unlock() - if !u.isRunningLocked() { + if !isRunning(u.stage) { // no recovery in progress, do nothing return } @@ -490,6 +492,11 @@ func (u *Controller) GetStage() stage { func (u *Controller) changeStage(stage stage) { u.stage = stage + // Halt and resume the scheduling once the running state changed. + running := isRunning(stage) + if opt := u.cluster.GetPersistOptions(); opt.IsSchedulingHalted() != running { + opt.SetHaltScheduling(running, "online-unsafe-recovery") + } var output StageOutput output.Time = time.Now().Format("2006-01-02 15:04:05.000") diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 4ac6a13fd70..f752d7ad108 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -2680,25 +2680,3 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) { func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) { return c.coordinator.GetPausedSchedulerDelayUntil(name) } - -var ( - onlineUnsafeRecoveryStatus = schedulingAllowanceStatusGauge.WithLabelValues("online-unsafe-recovery") - haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling") -) - -// CheckSchedulingAllowance checks if the cluster allows scheduling currently. -func (c *RaftCluster) CheckSchedulingAllowance() (bool, error) { - // If the cluster is in the process of online unsafe recovery, it should not allow scheduling. - if c.GetUnsafeRecoveryController().IsRunning() { - onlineUnsafeRecoveryStatus.Set(1) - return false, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() - } - onlineUnsafeRecoveryStatus.Set(0) - // If the halt-scheduling is set, it should not allow scheduling. - if c.opt.IsSchedulingHalted() { - haltSchedulingStatus.Set(1) - return false, errs.ErrSchedulingIsHalted.FastGenByArgs() - } - haltSchedulingStatus.Set(0) - return true, nil -} diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index cc401731a6f..51781bde7f6 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -43,8 +43,8 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { // HandleAskSplit handles the split request. func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) { - if allowed, err := c.CheckSchedulingAllowance(); !allowed { - return nil, err + if c.isSchedulingHalted() { + return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() } if !c.opt.IsTikvRegionSplitEnabled() { return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() @@ -86,6 +86,10 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp return split, nil } +func (c *RaftCluster) isSchedulingHalted() bool { + return c.opt.IsSchedulingHalted() +} + // ValidRequestRegion is used to decide if the region is valid. func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error { startKey := reqRegion.GetStartKey() @@ -105,8 +109,8 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error { // HandleAskBatchSplit handles the batch split request. func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { - if allowed, err := c.CheckSchedulingAllowance(); !allowed { - return nil, err + if c.isSchedulingHalted() { + return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() } if !c.opt.IsTikvRegionSplitEnabled() { return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() diff --git a/server/cluster/metrics.go b/server/cluster/metrics.go index 4306779d681..e43fe595f70 100644 --- a/server/cluster/metrics.go +++ b/server/cluster/metrics.go @@ -95,20 +95,11 @@ var ( Name: "store_sync", Help: "The state of store sync config", }, []string{"address", "state"}) - - schedulingAllowanceStatusGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "pd", - Subsystem: "scheduling", - Name: "allowance_status", - Help: "Status of the scheduling allowance.", - }, []string{"kind"}) ) func init() { prometheus.MustRegister(regionEventCounter) prometheus.MustRegister(healthStatusGauge) - prometheus.MustRegister(schedulingAllowanceStatusGauge) prometheus.MustRegister(clusterStateCPUGauge) prometheus.MustRegister(clusterStateCurrent) prometheus.MustRegister(bucketEventCounter) diff --git a/server/config/metrics.go b/server/config/metrics.go new file mode 100644 index 00000000000..84f5c00dd61 --- /dev/null +++ b/server/config/metrics.go @@ -0,0 +1,29 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import "github.com/prometheus/client_golang/prometheus" + +var schedulingAllowanceStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "scheduling", + Name: "allowance_status", + Help: "Status of the scheduling allowance.", + }, []string{"kind"}) + +func init() { + prometheus.MustRegister(schedulingAllowanceStatusGauge) +} diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 3539f251734..fdc14460f3b 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -939,14 +939,26 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien return err } +var haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling") + // SetHaltScheduling set HaltScheduling. -func (o *PersistOptions) SetHaltScheduling(halt bool) { +func (o *PersistOptions) SetHaltScheduling(halt bool, source string) { v := o.GetScheduleConfig().Clone() v.HaltScheduling = halt o.SetScheduleConfig(v) + if halt { + haltSchedulingStatus.Set(1) + schedulingAllowanceStatusGauge.WithLabelValues(source).Set(1) + } else { + haltSchedulingStatus.Set(0) + schedulingAllowanceStatusGauge.WithLabelValues(source).Set(0) + } } // IsSchedulingHalted returns if PD scheduling is halted. func (o *PersistOptions) IsSchedulingHalted() bool { + if o == nil { + return false + } return o.GetScheduleConfig().HaltScheduling }