Skip to content

Commit

Permalink
br: improve summary and progress visualization of br (pingcap#56612)
Browse files Browse the repository at this point in the history
  • Loading branch information
Leavrth authored Jan 22, 2025
1 parent b9b9787 commit ba79f50
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 201 deletions.
6 changes: 6 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -1240,10 +1241,15 @@ func collectRangeFiles(progressRangeTree *rtree.ProgressRangeTree, metaWriter *m
var rangeAscendErr error
progressRange.Res.Ascend(func(i btree.Item) bool {
r := i.(*rtree.Range)
cfCount := make(map[string]int)
for _, f := range r.Files {
cfCount[f.Cf] += 1
summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes)
}
for cf, count := range cfCount {
summary.CollectInt(fmt.Sprintf("%s CF files", cf), count)
}
// we need keep the files in order after we support multi_ingest sst.
// default_sst and write_sst need to be together.
if err := metaWriter.Send(r.Files, metautil.AppendDataFile); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ type Progress interface {
// called.
Close()
}

// WithProgress execute some logic with the progress, and close it once the execution done.
func WithProgress(
ctx context.Context,
g Glue,
cmdName string,
total int64,
redirectLog bool,
cc func(p Progress) error,
) error {
p := g.StartProgress(ctx, cmdName, total, redirectLog)
defer p.Close()
return cc(p)
}
28 changes: 14 additions & 14 deletions br/pkg/restore/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ type MultiTablesRestorer struct {
workerPool *util.WorkerPool
fileImporter BalancedFileImporter
checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]

fileCount int
start time.Time
}

func NewMultiTablesRestorer(
Expand Down Expand Up @@ -254,22 +257,16 @@ func (m *MultiTablesRestorer) WaitUntilFinish() error {
log.Error("restore files failed", zap.Error(err))
return errors.Trace(err)
}
elapsed := time.Since(m.start)
log.Info("Restore Stage Duration", zap.String("stage", "restore files"), zap.Duration("take", elapsed))
summary.CollectDuration("restore files", elapsed)
summary.CollectSuccessUnit("files", m.fileCount, elapsed)
return nil
}

func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ...BatchBackupFileSet) (err error) {
start := time.Now()
fileCount := 0
defer func() {
elapsed := time.Since(start)
if err == nil {
log.Info("Restore files", zap.Duration("take", elapsed))
summary.CollectSuccessUnit("files", fileCount, elapsed)
}
}()

log.Debug("start to restore files", zap.Int("files", fileCount))

func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ...BatchBackupFileSet) error {
m.start = time.Now()
m.fileCount = 0
if span := opentracing.SpanFromContext(m.ectx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.RestoreSSTFiles", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -285,6 +282,9 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ..
// breaking here directly is also a reasonable behavior.
break
}
for _, fileSet := range batchFileSet {
m.fileCount += len(fileSet.SSTFiles)
}
filesReplica := batchFileSet
m.fileImporter.PauseForBackpressure()
cx := logutil.ContextWithField(m.ectx, zap.Int("sn", i))
Expand All @@ -293,7 +293,7 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ..
defer func() {
if restoreErr == nil {
logutil.CL(cx).Info("import files done", zap.Duration("take", time.Since(fileStart)))
onProgress(int64(len(filesReplica)))
onProgress(1)
}
}()
if importErr := m.fileImporter.Import(cx, filesReplica...); importErr != nil {
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/restorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,20 @@ func TestMultiTablesRestorerRestoreSuccess(t *testing.T) {

var progress int64
fileSets := createSampleBatchFileSets()
fileSets2 := createSampleBatchFileSets()

var mu sync.Mutex
restorer.GoRestore(func(p int64) {
mu.Lock()
progress += p
mu.Unlock()
}, fileSets)
}, fileSets, fileSets2)
err := restorer.WaitUntilFinish()
require.NoError(t, err)

// Ensure progress was tracked correctly
require.Equal(t, int64(2), progress) // Total files group: 2
require.Equal(t, 1, importer.unblockCount)
require.Equal(t, 2, importer.unblockCount)
}

func TestMultiTablesRestorerRestoreWithImportError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (rc *SnapClient) InitCheckpoint(
if err != nil {
return checkpointSetWithTableID, nil, errors.Trace(err)
}

// t2 is the latest time the checkpoint checksum persisted to the external storage.
checkpointChecksum, t2, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, execCtx)
if err != nil {
return checkpointSetWithTableID, nil, errors.Trace(err)
Expand Down
126 changes: 121 additions & 5 deletions br/pkg/restore/snap_client/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/engine"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -59,6 +60,124 @@ func defaultOutputTableChan() chan *CreatedTable {
return make(chan *CreatedTable, defaultChannelSize)
}

// ExhaustErrors drains all remaining errors in the channel, into a slice of errors.
func ExhaustErrors(ec <-chan error) []error {
out := make([]error, 0, len(ec))
for {
select {
case err := <-ec:
out = append(out, err)
default:
// errCh will NEVER be closed(ya see, it has multi sender-part),
// so we just consume the current backlog of this channel, then return.
return out
}
}
}

type PipelineContext struct {
// pipeline item switch
Checksum bool
LoadStats bool
WaitTiflashReady bool

// pipeline item configuration
LogProgress bool
ChecksumConcurrency uint
StatsConcurrency uint

// pipeline item tool client
KvClient kv.Client
ExtStorage storage.ExternalStorage
Glue glue.Glue
}

// RestorePipeline does checksum, load stats and wait for tiflash to be ready.
func (rc *SnapClient) RestorePipeline(ctx context.Context, plCtx PipelineContext, createdTables []*CreatedTable) (err error) {
start := time.Now()
defer func() {
summary.CollectDuration("restore pipeline", time.Since(start))
}()
// We make bigger errCh so we won't block on multi-part failed.
errCh := make(chan error, 32)
postHandleCh := afterTableRestoredCh(ctx, createdTables)
progressLen := int64(0)
if plCtx.Checksum {
progressLen += int64(len(createdTables))
}
progressLen += int64(len(createdTables)) // for pipeline item - update stats meta
if plCtx.WaitTiflashReady {
progressLen += int64(len(createdTables))
}

// Redirect to log if there is no log file to avoid unreadable output.
updateCh := plCtx.Glue.StartProgress(ctx, "Restore Pipeline", progressLen, !plCtx.LogProgress)
defer updateCh.Close()
// pipeline checksum
if plCtx.Checksum {
postHandleCh = rc.GoValidateChecksum(ctx, postHandleCh, plCtx.KvClient, errCh, updateCh, plCtx.ChecksumConcurrency)
}

// pipeline update meta and load stats
postHandleCh = rc.GoUpdateMetaAndLoadStats(ctx, plCtx.ExtStorage, postHandleCh, errCh, updateCh, plCtx.StatsConcurrency, plCtx.LoadStats)

// pipeline wait Tiflash synced
if plCtx.WaitTiflashReady {
postHandleCh = rc.GoWaitTiFlashReady(ctx, postHandleCh, updateCh, errCh)
}

finish := dropToBlackhole(ctx, postHandleCh, errCh)

select {
case err = <-errCh:
err = multierr.Append(err, multierr.Combine(ExhaustErrors(errCh)...))
case <-finish:
}

return errors.Trace(err)
}

func afterTableRestoredCh(ctx context.Context, createdTables []*CreatedTable) <-chan *CreatedTable {
outCh := make(chan *CreatedTable)

go func() {
defer close(outCh)

for _, createdTable := range createdTables {
select {
case <-ctx.Done():
return
default:
outCh <- createdTable
}
}
}()
return outCh
}

// dropToBlackhole drop all incoming tables into black hole,
// i.e. don't execute checksum, just increase the process anyhow.
func dropToBlackhole(ctx context.Context, inCh <-chan *CreatedTable, errCh chan<- error) <-chan struct{} {
outCh := make(chan struct{}, 1)
go func() {
defer func() {
close(outCh)
}()
for {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
case _, ok := <-inCh:
if !ok {
return
}
}
}
}()
return outCh
}

func concurrentHandleTablesCh(
ctx context.Context,
inCh <-chan *CreatedTable,
Expand Down Expand Up @@ -114,11 +233,6 @@ func (rc *SnapClient) GoValidateChecksum(
outCh := defaultOutputTableChan()
workers := tidbutil.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum")
go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error {
start := time.Now()
defer func() {
elapsed := time.Since(start)
summary.CollectSuccessUnit("table checksum", 1, elapsed)
}()
err := rc.execAndValidateChecksum(c, tbl, kvClient, concurrency)
if err != nil {
return errors.Trace(err)
Expand All @@ -136,6 +250,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
s storage.ExternalStorage,
inCh <-chan *CreatedTable,
errCh chan<- error,
updateCh glue.Progress,
statsConcurrency uint,
loadStats bool,
) chan *CreatedTable {
Expand Down Expand Up @@ -186,6 +301,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(statsErr))
}
}
updateCh.Inc()
return nil
}, func() {
log.Info("all stats updated")
Expand Down
Loading

0 comments on commit ba79f50

Please sign in to comment.