Skip to content

Commit

Permalink
Merge branch 'master' into remove_pickFromBucket
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jan 28, 2023
2 parents 131b622 + 1e0956d commit e292386
Show file tree
Hide file tree
Showing 80 changed files with 4,023 additions and 382 deletions.
1,435 changes: 1,369 additions & 66 deletions DEPS.bzl

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type Engine struct {
config backend.LocalEngineConfig
tableInfo *checkpoints.TidbTableInfo

dupDetectOpt dupDetectOpt

// total size of SST files waiting to be ingested
pendingFileSize atomic.Int64

Expand Down Expand Up @@ -981,7 +983,7 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)),
zap.Int64("tableID", e.tableInfo.ID),
zap.Stringer("engineUUID", e.UUID))
return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger)
return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt)
}

func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) {
Expand Down
17 changes: 16 additions & 1 deletion br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/pebble"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"go.uber.org/multierr"
Expand Down Expand Up @@ -82,6 +83,11 @@ type dupDetectIter struct {
writeBatch *pebble.Batch
writeBatchSize int64
logger log.Logger
option dupDetectOpt
}

type dupDetectOpt struct {
reportErrOnDup bool
}

func (d *dupDetectIter) Seek(key []byte) bool {
Expand Down Expand Up @@ -149,6 +155,14 @@ func (d *dupDetectIter) Next() bool {
d.curVal = append(d.curVal[:0], d.iter.Value()...)
return true
}
if d.option.reportErrOnDup {
dupKey := make([]byte, len(d.curKey))
dupVal := make([]byte, len(d.iter.Value()))
copy(dupKey, d.curKey)
copy(dupVal, d.curVal)
d.err = common.ErrFoundDuplicateKeys.FastGenByArgs(dupKey, dupVal)
return false
}
if !recordFirst {
d.record(d.curRawKey, d.curKey, d.curVal)
recordFirst = true
Expand Down Expand Up @@ -192,7 +206,7 @@ func (d *dupDetectIter) OpType() sst.Pair_OP {
var _ Iter = &dupDetectIter{}

func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter,
opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger) *dupDetectIter {
opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger, dupOpt dupDetectOpt) *dupDetectIter {
newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter}
if len(opts.LowerBound) > 0 {
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64)
Expand All @@ -206,6 +220,7 @@ func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter,
keyAdapter: keyAdapter,
writeBatch: dupDB.NewBatch(),
logger: logger,
option: dupOpt,
}
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestDupDetectIterator(t *testing.T) {
dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{})
require.NoError(t, err)
var iter Iter
iter = newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L())
iter = newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), dupDetectOpt{})
sort.Slice(pairs, func(i, j int) bool {
key1 := keyAdapter.Encode(nil, pairs[i].Key, pairs[i].RowID)
key2 := keyAdapter.Encode(nil, pairs[j].Key, pairs[j].RowID)
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestDupDetectIterSeek(t *testing.T) {

dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{})
require.NoError(t, err)
iter := newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L())
iter := newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), dupDetectOpt{})

require.True(t, iter.Seek([]byte{1, 2, 3, 1}))
require.Equal(t, pairs[1].Val, iter.Value())
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ type local struct {

checkTiKVAvaliable bool
duplicateDetection bool
duplicateDetectOpt dupDetectOpt
duplicateDB *pebble.DB
keyAdapter KeyAdapter
errorMgr *errormanager.ErrorManager
Expand Down Expand Up @@ -500,6 +501,7 @@ func NewLocalBackend(
engineMemCacheSize: int(cfg.TikvImporter.EngineMemCacheSize),
localWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize),
duplicateDetection: duplicateDetection,
duplicateDetectOpt: dupDetectOpt{duplicateDetection && cfg.TikvImporter.DuplicateResolution == config.DupeResAlgErr},
checkTiKVAvaliable: cfg.App.CheckRequirements,
duplicateDB: duplicateDB,
keyAdapter: keyAdapter,
Expand Down Expand Up @@ -804,6 +806,7 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e
config: engineCfg,
tableInfo: cfg.TableInfo,
duplicateDetection: local.duplicateDetection,
dupDetectOpt: local.duplicateDetectOpt,
duplicateDB: local.duplicateDB,
errorMgr: local.errorMgr,
keyAdapter: local.keyAdapter,
Expand Down Expand Up @@ -854,6 +857,7 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig,
tableInfo: cfg.TableInfo,
keyAdapter: local.keyAdapter,
duplicateDetection: local.duplicateDetection,
dupDetectOpt: local.duplicateDetectOpt,
duplicateDB: local.duplicateDB,
errorMgr: local.errorMgr,
logger: log.FromContext(ctx),
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var (
ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus"))
ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming"))
ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows"))
ErrFoundDuplicateKeys = errors.Normalize("found duplicate key '%s', value '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDuplicateKey"))
)

type withStack struct {
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ const (
// DupeResAlgRemove records all duplicate records like the 'record' algorithm and remove all information related to the
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows.
DupeResAlgRemove

// DupeResAlgErr reports an error and stops the import process.
// Note: this value is only used for internal.
DupeResAlgErr
)

func (dra *DuplicateResolutionAlgorithm) UnmarshalTOML(v interface{}) error {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_library(
"//util/collate",
"//util/hack",
"//util/mathutil",
"//util/sqlexec",
"//util/table-filter",
"@com_github_emirpasic_gods//maps/treemap",
"@com_github_go_sql_driver_mysql//:mysql",
Expand Down
41 changes: 28 additions & 13 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -1126,6 +1127,18 @@ func (rc *Client) SplitRanges(ctx context.Context,
return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv)
}

func (rc *Client) WrapLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, g glue.Glue, store kv.Storage) (LogIter, error) {
se, err := g.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor)
splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx)
log.Info("get split threshold from tikv config", zap.Uint64("split-size", splitSize), zap.Int64("split-keys", splitKeys))
client := split.NewSplitClient(rc.GetPDClient(), rc.GetTLSConfig(), false)
return NewLogFilesIterWithSplitHelper(iter, rules, client, splitSize, splitKeys), nil
}

// RestoreSSTFiles tries to restore the files.
func (rc *Client) RestoreSSTFiles(
ctx context.Context,
Expand Down Expand Up @@ -2043,34 +2056,36 @@ func (rc *Client) RestoreKVFiles(
log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId))
skipFile += len(files)
} else {
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
rc.workerPool.ApplyOnErrorGroup(eg, func() (err error) {
fileStart := time.Now()
defer func() {
onProgress(int64(len(files)))
updateStats(uint64(kvCount), size)
summary.CollectInt("File", len(files))

filenames := make([]string, 0, len(files))
for _, f := range files {
filenames = append(filenames, f.Path+", ")
if err == nil {
filenames := make([]string, 0, len(files))
for _, f := range files {
filenames = append(filenames, f.Path+", ")
}
log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size),
zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames))
}
log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size),
zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames))
}()

return rc.fileImporter.ImportKVFiles(ectx, files, rule, rc.shiftStartTS, rc.startTS, rc.restoreTS, supportBatch)
})
}
}

if supportBatch {
err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc)
} else {
err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc)
}
if err != nil {
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
if supportBatch {
err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc)
} else {
err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc)
}
return errors.Trace(err)
}
})

log.Info("total skip files due to table id not matched", zap.Int("count", skipFile))
if skipFile > 0 {
Expand Down
Loading

0 comments on commit e292386

Please sign in to comment.