Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: give priority to small tables for importing #156

Merged
merged 4 commits into from
Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions lightning/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10),
},
)
RowKVDeliverSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "lightning",
Name: "row_kv_deliver_seconds",
Help: "time needed to deliver kvs of a single row",
Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10),
},
)
BlockDeliverSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "lightning",
Expand Down Expand Up @@ -186,6 +194,7 @@ func init() {
prometheus.MustRegister(RowReadSecondsHistogram)
prometheus.MustRegister(RowReadBytesHistogram)
prometheus.MustRegister(RowEncodeSecondsHistogram)
prometheus.MustRegister(RowKVDeliverSecondsHistogram)
prometheus.MustRegister(BlockDeliverSecondsHistogram)
prometheus.MustRegister(BlockDeliverBytesHistogram)
prometheus.MustRegister(BlockDeliverKVPairsHistogram)
Expand Down
18 changes: 17 additions & 1 deletion lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"os"
"path/filepath"
"regexp"
"sort"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -44,6 +45,7 @@ type MDTableMeta struct {
SchemaFile string
DataFiles []string
charSet string
TotalSize int64
}

func (m *MDTableMeta) GetSchema() string {
Expand Down Expand Up @@ -121,6 +123,7 @@ func (ftype fileType) String() string {
type fileInfo struct {
tableName filter.Table
path string
size int64
}

var tableNameRegexp = regexp.MustCompile(`^([^.]+)\.(.*?)(?:\.[0-9]+)?$`)
Expand All @@ -135,6 +138,10 @@ var tableNameRegexp = regexp.MustCompile(`^([^.]+)\.(.*?)(?:\.[0-9]+)?$`)
// files are visited in lexicographical order (note that this does not mean the
// databases and tables in the end are ordered lexicographically since they may
// be stored in different subdirectories).
//
// Will sort tables by table size, this means that the big table is imported
// at the latest, which to avoid large table take a long time to import and block
// small table to release index worker.
func (s *mdLoaderSetup) setup(dir string) error {
/*
Mydumper file names format
Expand Down Expand Up @@ -184,6 +191,15 @@ func (s *mdLoaderSetup) setup(dir string) error {
}
}
tableMeta.DataFiles = append(tableMeta.DataFiles, fileInfo.path)
tableMeta.TotalSize += fileInfo.size
}

// Put the small table in the front of the slice which can avoid large table
// take a long time to import and block small table to release index worker.
for _, dbMeta := range s.loader.dbs {
sort.SliceStable(dbMeta.Tables, func(i, j int) bool {
return dbMeta.Tables[i].TotalSize < dbMeta.Tables[j].TotalSize
})
}

return nil
Expand All @@ -205,7 +221,7 @@ func (s *mdLoaderSetup) listFiles(dir string) error {
fname := strings.TrimSpace(f.Name())
lowerFName := strings.ToLower(fname)

info := fileInfo{path: path}
info := fileInfo{path: path, size: f.Size()}

var (
ftype fileType
Expand Down
28 changes: 21 additions & 7 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,23 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
stopPeriodicActions := make(chan struct{}, 1)
go rc.runPeriodicActions(ctx, stopPeriodicActions)

type task struct {
tr *TableRestore
cp *TableCheckpoint
}
taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
defer close(taskCh)
for i := 0; i < rc.cfg.App.IndexConcurrency; i++ {
go func() {
for task := range taskCh {
err := task.tr.restoreTable(ctx, rc, task.cp)
metric.RecordTableCount("completed", err)
restoreErr.Set(task.tr.tableName, err)
wg.Done()
}
}()
}

for _, dbMeta := range rc.dbMetas {
dbInfo, ok := rc.dbInfos[dbMeta.Name]
if !ok {
Expand Down Expand Up @@ -493,12 +510,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}

wg.Add(1)
go func(t *TableRestore, cp *TableCheckpoint) {
defer wg.Done()
err := t.restoreTable(ctx, rc, cp)
metric.RecordTableCount("completed", err)
restoreErr.Set(t.tableName, err)
}(tr, cp)
taskCh <- task{tr: tr, cp: cp}
}
}

Expand Down Expand Up @@ -1597,9 +1609,10 @@ outside:
return errors.Trace(err)
}

deliverKvStart := time.Now()
select {
case kvsCh <- deliveredKVs{kvs: kvs, offset: newOffset, rowID: rowID}:
continue
break
case <-ctx.Done():
return ctx.Err()
case deliverResult := <-deliverCompleteCh:
Expand All @@ -1608,6 +1621,7 @@ outside:
}
return errors.Trace(deliverResult.err)
}
metric.RowKVDeliverSecondsHistogram.Observe(time.Since(deliverKvStart).Seconds())
}

lastOffset, lastRowID := cr.parser.Pos()
Expand Down