Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

restore: adjust PD config to speed up restore #198

Merged
merged 19 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
restore: adjust PD config to speed up restore
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Mar 19, 2020
commit 76855df9554f1145b6a3df9159accc7c615a7d5e
48 changes: 48 additions & 0 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
regionCountPrefix = "pd/api/v1/stats/region"
schdulerPrefix = "pd/api/v1/schedulers"
maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response
scheduleConfigPrefix = "pd/api/v1/config/schedule"
)

// Mgr manages connections to a TiDB cluster.
Expand Down Expand Up @@ -433,6 +434,53 @@ func (mgr *Mgr) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]st
return nil, err
}

// GetPDScheduleConfig returns PD schedule config value associated with the key.
// It returns nil if there is no such config item.
func (mgr *Mgr) GetPDScheduleConfig(
ctx context.Context, configKey string,
kennytm marked this conversation as resolved.
Show resolved Hide resolved
) (interface{}, error) {
var err error
for _, addr := range mgr.pdHTTP.addrs {
v, e := pdRequest(
ctx, addr, scheduleConfigPrefix, mgr.pdHTTP.cli, http.MethodGet, nil)
if e != nil {
err = e
continue
}
cfg := make(map[string]interface{})
err = json.Unmarshal(v, &cfg)
if err != nil {
return nil, err
}
value, ok := cfg[configKey]
if !ok {
return nil, nil
}
return value, nil
}
return nil, err
}

// UpdatePDScheduleConfig updates PD schedule config value associated with the key.
func (mgr *Mgr) UpdatePDScheduleConfig(
ctx context.Context, configKey string, configValue interface{},
kennytm marked this conversation as resolved.
Show resolved Hide resolved
) error {
for _, addr := range mgr.pdHTTP.addrs {
cfg := make(map[string]interface{})
cfg[configKey] = configValue
reqData, err := json.Marshal(cfg)
if err != nil {
return err
}
_, e := pdRequest(ctx, addr, scheduleConfigPrefix,
mgr.pdHTTP.cli, http.MethodPost, bytes.NewBuffer(reqData))
if e == nil {
return nil
}
}
return errors.New("update PD schedule config failed")
}

// Close closes all client in Mgr.
func (mgr *Mgr) Close() {
mgr.grpcClis.mu.Lock()
Expand Down
154 changes: 120 additions & 34 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package task

import (
"context"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
Expand All @@ -24,23 +25,32 @@ import (

const (
flagOnline = "online"
)

var schedulers = map[string]struct{}{
"balance-leader-scheduler": {},
"balance-hot-region-scheduler": {},
"balance-region-scheduler": {},

"shuffle-leader-scheduler": {},
"shuffle-region-scheduler": {},
"shuffle-hot-region-scheduler": {},
}

const (
defaultRestoreConcurrency = 128
maxRestoreBatchSizeLimit = 256
)

var (
schedulers = map[string]struct{}{
"balance-leader-scheduler": {},
"balance-hot-region-scheduler": {},
"balance-region-scheduler": {},

"shuffle-leader-scheduler": {},
"shuffle-region-scheduler": {},
"shuffle-hot-region-scheduler": {},
}
pdRegionMergeCfg = []string{
"max-merge-region-keys",
"max-merge-region-size",
}
pdScheduleLimitCfg = []string{
"leader-schedule-limit",
"region-schedule-limit",
"max-snapshot-count",
}
)

// RestoreConfig is the configuration specific for restore tasks.
type RestoreConfig struct {
Config
Expand Down Expand Up @@ -223,20 +233,14 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
// After split, we can restore backup files.
err = client.RestoreFiles(fileBatch, rewriteRules, updateCh)
if err != nil {
break
return err
}
}

// Always run the post-work even on error, so we don't stuck in the import
// mode or emptied schedulers
err = restorePostWork(ctx, client, mgr, clusterCfg)
if err != nil {
return err
}

if err = splitPostWork(ctx, client, newTables); err != nil {
return err
}
restorePostWork(ctx, client, mgr, clusterCfg)
splitPostWork(ctx, client, newTables)

// Restore has finished.
close(updateCh)
Expand Down Expand Up @@ -285,27 +289,95 @@ func filterRestoreFiles(
return
}

type clusterConfig struct {
// Enable PD schedulers before restore
scheduler []string
// Region merge configuration before restore
mergeCfg map[string]int
// Scheudle limits configuration before restore
scheduleLimitCfg map[string]int
}

// restorePreWork executes some prepare work before restore
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) ([]string, error) {
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (clusterConfig, error) {
if client.IsOnline() {
return nil, nil
return clusterConfig{}, nil
}

// Switch TiKV cluster to import mode (adjust rocksdb configuration).
if err := client.SwitchToImportMode(ctx); err != nil {
return nil, err
return clusterConfig{}, nil
}

// Remove default PD scheduler that may affect restore process.
existSchedulers, err := mgr.ListSchedulers(ctx)
if err != nil {
return nil, errors.Trace(err)
return clusterConfig{}, nil
}
needRemoveSchedulers := make([]string, 0, len(existSchedulers))
for _, s := range existSchedulers {
if _, ok := schedulers[s]; ok {
needRemoveSchedulers = append(needRemoveSchedulers, s)
}
}
return removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers)
scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers)
if err != nil {
return clusterConfig{}, nil
}

stores, err := mgr.GetPDClient().GetAllStores(ctx)
if err != nil {
return clusterConfig{}, err
}

mergeCfg := make(map[string]int)
for _, cfgKey := range pdRegionMergeCfg {
value, err := mgr.GetPDScheduleConfig(ctx, cfgKey)
if err != nil {
return clusterConfig{}, err
}
if value == nil {
// Ignore non-exist config.
continue
}
mergeCfg[cfgKey] = int(value.(float64))

// Disable region merge by setting config to 0.
err = mgr.UpdatePDScheduleConfig(ctx, cfgKey, 0)
if err != nil {
return clusterConfig{}, err
}
}

scheduleLimitCfg := make(map[string]int)
for _, cfgKey := range pdScheduleLimitCfg {
value, err := mgr.GetPDScheduleConfig(ctx, cfgKey)
if err != nil {
return clusterConfig{}, err
}
if value == nil {
// Ignore non-exist config.
continue
}
limit := int(value.(float64))
scheduleLimitCfg[cfgKey] = limit

// Speed update PD scheduler by enlarging scheduling limits.
// Multiply limits by store count but no more than 40.
// TODO: why 40?
newLimits := math.Max(40, float64(limit*len(stores)))
err = mgr.UpdatePDScheduleConfig(ctx, cfgKey, int(newLimits))
if err != nil {
return clusterConfig{}, err
}
}

cluster := clusterConfig{
scheduler: scheduler,
mergeCfg: mergeCfg,
scheduleLimitCfg: scheduleLimitCfg,
}
return cluster, nil
}

func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers []string) ([]string, error) {
Expand All @@ -321,14 +393,28 @@ func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers
}

// restorePostWork executes some post work after restore
func restorePostWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr, removedSchedulers []string) error {
func restorePostWork(
ctx context.Context, client *restore.Client, mgr *conn.Mgr, clusterCfg clusterConfig,
) {
if client.IsOnline() {
return nil
return
}
if err := client.SwitchToNormalMode(ctx); err != nil {
return err
log.Warn("fail to switch to normal mode")
}
if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil {
log.Warn("fail to add PD schedulers")
}
for cfgKey, cfgValue := range clusterCfg.mergeCfg {
if err := mgr.UpdatePDScheduleConfig(ctx, cfgKey, cfgValue); err != nil {
log.Warn("fail to update PD region merge config")
}
}
for cfgKey, cfgValue := range clusterCfg.scheduleLimitCfg {
if err := mgr.UpdatePDScheduleConfig(ctx, cfgKey, cfgValue); err != nil {
log.Warn("fail to update PD scheule limit config")
}
}
return addPDLeaderScheduler(ctx, mgr, removedSchedulers)
}

func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers []string) error {
Expand Down Expand Up @@ -356,17 +442,17 @@ func splitPrepareWork(ctx context.Context, client *restore.Client, tables []*mod
return nil
}

func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) error {
func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) {
err := client.ResetPlacementRules(ctx, tables)
if err != nil {
return errors.Trace(err)
log.Warn("reset placement rules failed", zap.Error(err))
return
}

err = client.ResetRestoreLabels(ctx)
if err != nil {
return errors.Trace(err)
log.Warn("reset store labels failed", zap.Error(err))
}
return nil
}

// RunRestoreTiflashReplica restores the replica of tiflash saved in the last restore.
Expand Down
5 changes: 1 addition & 4 deletions pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
return errors.Trace(err)
}

err = restorePostWork(ctx, client, mgr, removedSchedulers)
if err != nil {
return errors.Trace(err)
}
restorePostWork(ctx, client, mgr, removedSchedulers)
// Restore has finished.
close(updateCh)

Expand Down