From 20679141c0914ac0ba86f53a4db6391ecd91910b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Wed, 8 Sep 2021 04:34:58 -0700 Subject: [PATCH] [cherry-pick] restore: split & scatter regions concurrently(tidb#27034) (#1429) --- pkg/logutil/logging.go | 2 +- pkg/restore/import.go | 2 + pkg/restore/pipeline_items.go | 149 +++++++++++++++++++++++++++------- pkg/restore/range.go | 2 + pkg/restore/split.go | 9 +- pkg/task/restore.go | 36 +++++++- pkg/utils/worker.go | 12 ++- 7 files changed, 173 insertions(+), 39 deletions(-) diff --git a/pkg/logutil/logging.go b/pkg/logutil/logging.go index 7e803ba64..7a949e1fd 100644 --- a/pkg/logutil/logging.go +++ b/pkg/logutil/logging.go @@ -115,7 +115,7 @@ func (region zapMarshalRegionMarshaler) MarshalLogObject(enc zapcore.ObjectEncod for _, peer := range region.GetPeers() { peers = append(peers, peer.String()) } - enc.AddUint64("ID", region.Id) + enc.AddUint64("ID", region.GetId()) enc.AddString("startKey", redact.Key(region.GetStartKey())) enc.AddString("endKey", redact.Key(region.GetEndKey())) enc.AddString("epoch", region.GetRegionEpoch().String()) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 9fefe8e9f..740f38858 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -260,6 +260,8 @@ func (importer *FileImporter) Import( logutil.Region(info.Region), logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), + logutil.Key("file-simple-start", file.StartKey), + logutil.Key("file-simple-end", file.EndKey), logutil.ShortError(e)) continue regionLoop } diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index 55490be0d..5bceff2a9 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/parser/model" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" @@ -74,6 +75,7 @@ type brContextManager struct { // This 'set' of table ID allow us to handle each table just once. hasTable map[int64]CreatedTable + mu sync.Mutex } func (manager *brContextManager) Close(ctx context.Context) { @@ -86,6 +88,8 @@ func (manager *brContextManager) Close(ctx context.Context) { func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error { placementRuleTables := make([]*model.TableInfo, 0, len(tables)) + manager.mu.Lock() + defer manager.mu.Unlock() for _, tbl := range tables { if _, ok := manager.hasTable[tbl.Table.ID]; !ok { @@ -98,6 +102,8 @@ func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTabl } func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTable) error { + manager.mu.Lock() + defer manager.mu.Unlock() placementRuleTables := make([]*model.TableInfo, 0, len(tables)) for _, table := range tables { @@ -183,6 +189,8 @@ type tikvSender struct { inCh chan<- DrainResult wg *sync.WaitGroup + + tableWaiters *sync.Map } func (b *tikvSender) PutSink(sink TableSink) { @@ -192,6 +200,7 @@ func (b *tikvSender) PutSink(sink TableSink) { } func (b *tikvSender) RestoreBatch(ranges DrainResult) { + log.Info("restore batch: waiting ranges", zap.Int("range", len(b.inCh))) b.inCh <- ranges } @@ -200,29 +209,52 @@ func NewTiKVSender( ctx context.Context, cli *Client, updateCh glue.Progress, + splitConcurrency uint, ) (BatchSender, error) { inCh := make(chan DrainResult, defaultChannelSize) - midCh := make(chan DrainResult, defaultChannelSize) + midCh := make(chan drainResultAndDone, defaultChannelSize) sender := &tikvSender{ - client: cli, - updateCh: updateCh, - inCh: inCh, - wg: new(sync.WaitGroup), + client: cli, + updateCh: updateCh, + inCh: inCh, + wg: new(sync.WaitGroup), + tableWaiters: new(sync.Map), } sender.wg.Add(2) - go sender.splitWorker(ctx, inCh, midCh) + go sender.splitWorker(ctx, inCh, midCh, splitConcurrency) go sender.restoreWorker(ctx, midCh) return sender, nil } -func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) { +func (b *tikvSender) Close() { + close(b.inCh) + b.wg.Wait() + log.Debug("tikv sender closed") +} + +type drainResultAndDone struct { + result DrainResult + done func() +} + +func (b *tikvSender) splitWorker(ctx context.Context, + ranges <-chan DrainResult, + next chan<- drainResultAndDone, + concurrency uint, +) { defer log.Debug("split worker closed") + eg, ectx := errgroup.WithContext(ctx) defer func() { b.wg.Done() + if err := eg.Wait(); err != nil { + b.sink.EmitError(err) + return + } close(next) }() + pool := utils.NewWorkerPool(concurrency, "split") for { select { case <-ctx.Done(): @@ -231,19 +263,78 @@ func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, if !ok { return } - if err := SplitRanges(ctx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil { - log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err)) - b.sink.EmitError(err) - return - } - next <- result + // When the batcher has sent all ranges from a table, it would + // mark this table 'all done'(BlankTablesAfterSend), and then we can send it to checksum. + // + // When there a sole worker sequentially running those batch tasks, everything is fine, however, + // in the context of multi-workers, that become buggy, for example: + // |------table 1, ranges 1------|------table 1, ranges 2------| + // The batcher send batches: [ + // {Ranges: ranges 1}, + // {Ranges: ranges 2, BlankTablesAfterSend: table 1} + // ] + // And there are two workers runs concurrently: + // worker 1: {Ranges: ranges 1} + // worker 2: {Ranges: ranges 2, BlankTablesAfterSend: table 1} + // And worker 2 finished its job before worker 1 done. Note the table wasn't restored fully, + // hence the checksum would fail. + done := b.registerTableIsRestoring(result.TablesToSend) + pool.ApplyOnErrorGroup(eg, func() error { + err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh) + if err != nil { + log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err)) + return err + } + next <- drainResultAndDone{ + result: result, + done: done, + } + return nil + }) + } + } +} + +// registerTableIsRestoring marks some tables as 'current restoring'. +// Returning a function that mark the restore has been done. +func (b *tikvSender) registerTableIsRestoring(ts []CreatedTable) func() { + wgs := make([]*sync.WaitGroup, 0, len(ts)) + for _, t := range ts { + i, _ := b.tableWaiters.LoadOrStore(t.Table.ID, new(sync.WaitGroup)) + wg := i.(*sync.WaitGroup) + wg.Add(1) + wgs = append(wgs, wg) + } + return func() { + for _, wg := range wgs { + wg.Done() + } + } +} + +// waitTablesDone block the current goroutine, +// till all tables provided are no more ‘current restoring’. +func (b *tikvSender) waitTablesDone(ts []CreatedTable) { + for _, t := range ts { + wg, ok := b.tableWaiters.Load(t.Table.ID) + if !ok { + log.Panic("bug! table done before register!", + zap.Any("wait-table-map", b.tableWaiters), + zap.Stringer("table", t.Table.Name)) } + b.tableWaiters.Delete(t.Table.ID) + wg.(*sync.WaitGroup).Wait() } } -func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResult) { +func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResultAndDone) { + eg, ectx := errgroup.WithContext(ctx) defer func() { log.Debug("restore worker closed") + if err := eg.Wait(); err != nil { + b.sink.EmitError(err) + return + } b.wg.Done() b.sink.Close() }() @@ -251,24 +342,24 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResul select { case <-ctx.Done(): return - case result, ok := <-ranges: + case r, ok := <-ranges: if !ok { return } - files := result.Files() - if err := b.client.RestoreFiles(ctx, files, result.RewriteRules, b.updateCh); err != nil { - b.sink.EmitError(err) - return - } - - log.Info("restore batch done", rtree.ZapRanges(result.Ranges)) - b.sink.EmitTables(result.BlankTablesAfterSend...) + files := r.result.Files() + // There has been a worker in the `RestoreFiles` procedure. + // Spawning a raw goroutine won't make too many requests to TiKV. + eg.Go(func() error { + e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh) + if e != nil { + return e + } + log.Info("restore batch done", rtree.ZapRanges(r.result.Ranges)) + r.done() + b.waitTablesDone(r.result.BlankTablesAfterSend) + b.sink.EmitTables(r.result.BlankTablesAfterSend...) + return nil + }) } } } - -func (b *tikvSender) Close() { - close(b.inCh) - b.wg.Wait() - log.Debug("tikv sender closed") -} diff --git a/pkg/restore/range.go b/pkg/restore/range.go index 54e5fa687..f166a019d 100644 --- a/pkg/restore/range.go +++ b/pkg/restore/range.go @@ -34,12 +34,14 @@ func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range "rewrite start key", logutil.Key("key", rg.StartKey), logutil.RewriteRule(rule)) } + oldKey := rg.EndKey rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules) if rule == nil { log.Warn("cannot find rewrite rule", logutil.Key("key", rg.EndKey)) } else { log.Debug( "rewrite end key", + logutil.Key("origin-key", oldKey), logutil.Key("key", rg.EndKey), logutil.RewriteRule(rule)) } diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 12b6a5eff..317b41265 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -89,12 +89,13 @@ SplitRegions: for i := 0; i < SplitRetryTimes; i++ { regions, errScan := PaginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit) if errScan != nil { + if berrors.ErrPDBatchScanRegion.Equal(errScan) { + log.Warn("inconsistent region info get.", logutil.ShortError(errScan)) + time.Sleep(time.Second) + continue SplitRegions + } return errors.Trace(errScan) } - if len(regions) == 0 { - log.Warn("split regions cannot scan any region") - return nil - } splitKeyMap := GetSplitKeys(rewriteRules, sortedRanges, regions) regionMap := make(map[uint64]*RegionInfo) for _, region := range regions { diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 60c3f22dc..fc9e31089 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -34,9 +34,15 @@ const ( FlagMergeRegionSizeBytes = "merge-region-size-bytes" // FlagMergeRegionKeyCount is the flag name of merge small regions by key count FlagMergeRegionKeyCount = "merge-region-key-count" + // FlagPDConcurrency controls concurrency pd-relative operations like split & scatter. + FlagPDConcurrency = "pd-concurrency" + // FlagBatchFlushInterval controls after how long the restore batch would be auto sended. + FlagBatchFlushInterval = "batch-flush-interval" defaultRestoreConcurrency = 128 maxRestoreBatchSizeLimit = 10240 + defaultPDConcurrency = 1 + defaultBatchFlushInterval = 16 * time.Second defaultDDLConcurrency = 16 ) @@ -70,9 +76,15 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) { flags.Uint64(FlagMergeRegionSizeBytes, restore.DefaultMergeRegionSizeBytes, "the threshold of merging small regions (Default 96MB, region split size)") flags.Uint64(FlagMergeRegionKeyCount, restore.DefaultMergeRegionKeyCount, - "the threshold of merging smalle regions (Default 960_000, region split key count)") + "the threshold of merging small regions (Default 960_000, region split key count)") + flags.Uint(FlagPDConcurrency, defaultPDConcurrency, + "concurrency pd-relative operations like split & scatter.") + flags.Duration(FlagBatchFlushInterval, defaultBatchFlushInterval, + "after how long a restore batch would be auto sended.") _ = flags.MarkHidden(FlagMergeRegionSizeBytes) _ = flags.MarkHidden(FlagMergeRegionKeyCount) + _ = flags.MarkHidden(FlagPDConcurrency) + _ = flags.MarkHidden(FlagBatchFlushInterval) } // ParseFromFlags parses the config from the flag set. @@ -98,7 +110,9 @@ type RestoreConfig struct { Config RestoreCommonConfig - NoSchema bool `json:"no-schema" toml:"no-schema"` + NoSchema bool `json:"no-schema" toml:"no-schema"` + PDConcurrency uint `json:"pd-concurrency" toml:"pd-concurrency"` + BatchFlushInterval time.Duration `json:"batch-flush-interval" toml:"batch-flush-interval"` } // DefineRestoreFlags defines common flags for the restore tidb command. @@ -129,6 +143,14 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if cfg.Config.Concurrency == 0 { cfg.Config.Concurrency = defaultRestoreConcurrency } + cfg.PDConcurrency, err = flags.GetUint(FlagPDConcurrency) + if err != nil { + return errors.Annotatef(err, "failed to get flag %s", FlagPDConcurrency) + } + cfg.BatchFlushInterval, err = flags.GetDuration(FlagBatchFlushInterval) + if err != nil { + return errors.Annotatef(err, "failed to get flag %s", FlagBatchFlushInterval) + } return nil } @@ -146,6 +168,12 @@ func (cfg *RestoreConfig) adjustRestoreConfig() { if cfg.Config.SwitchModeInterval == 0 { cfg.Config.SwitchModeInterval = defaultSwitchInterval } + if cfg.PDConcurrency == 0 { + cfg.PDConcurrency = defaultPDConcurrency + } + if cfg.BatchFlushInterval == 0 { + cfg.BatchFlushInterval = defaultBatchFlushInterval + } } // CheckRestoreDBAndTable is used to check whether the restore dbs or tables have been backup @@ -379,14 +407,14 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf int64(rangeSize+len(files)+len(tables)), !cfg.LogProgress) defer updateCh.Close() - sender, err := restore.NewTiKVSender(ctx, client, updateCh) + sender, err := restore.NewTiKVSender(ctx, client, updateCh, cfg.PDConcurrency) if err != nil { return errors.Trace(err) } manager := restore.NewBRContextManager(client) batcher, afterRestoreStream := restore.NewBatcher(ctx, sender, manager, errCh) batcher.SetThreshold(batchSize) - batcher.EnableAutoCommit(ctx, time.Second) + batcher.EnableAutoCommit(ctx, cfg.BatchFlushInterval) go restoreTableStream(ctx, rangeStream, batcher, errCh) var finish <-chan struct{} diff --git a/pkg/utils/worker.go b/pkg/utils/worker.go index e0a9ac5d4..ec3ec3b95 100644 --- a/pkg/utils/worker.go +++ b/pkg/utils/worker.go @@ -37,6 +37,16 @@ func NewWorkerPool(limit uint, name string) *WorkerPool { } } +// IdleCount counts how many idle workers in the pool. +func (pool *WorkerPool) IdleCount() int { + return len(pool.workers) +} + +// Limit is the limit of the pool +func (pool *WorkerPool) Limit() int { + return int(pool.limit) +} + // Apply executes a task. func (pool *WorkerPool) Apply(fn taskFunc) { worker := pool.apply() @@ -93,5 +103,5 @@ func (pool *WorkerPool) recycle(worker *Worker) { // HasWorker checks if the pool has unallocated workers. func (pool *WorkerPool) HasWorker() bool { - return len(pool.workers) > 0 + return pool.IdleCount() > 0 }