diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index f116443840d69..53ba8a88d9d7d 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1588,7 +1588,10 @@ func (rc *Client) WaitForFilesRestored(ctx context.Context, files []*backuppb.Fi fileReplica := file rc.workerPool.ApplyOnErrorGroup(eg, func() error { - defer updateCh.Inc() + defer func() { + log.Info("import sst files done", logutil.Files(files)) + updateCh.Inc() + }() return rc.fileImporter.ImportSSTFiles(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher, rc.backupMeta.ApiVersion) }) } diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index be549ac13c6a0..f0dbb914b122e 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -485,6 +485,30 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error { return nil } +func getKeyRangeByMode(mode KvMode) func(f *backuppb.File, rules *RewriteRules) ([]byte, []byte, error) { + switch mode { + case Raw: + return func(f *backuppb.File, rules *RewriteRules) ([]byte, []byte, error) { + return f.GetStartKey(), f.GetEndKey(), nil + } + case Txn: + return func(f *backuppb.File, rules *RewriteRules) ([]byte, []byte, error) { + start, end := f.GetStartKey(), f.GetEndKey() + if len(start) != 0 { + start = codec.EncodeBytes([]byte{}, f.GetStartKey()) + } + if len(end) != 0 { + end = codec.EncodeBytes([]byte{}, f.GetEndKey()) + } + return start, end, nil + } + default: + return func(f *backuppb.File, rules *RewriteRules) ([]byte, []byte, error) { + return GetRewriteRawKeys(f, rules) + } + } +} + // getKeyRangeForFiles gets the maximum range on files. func (importer *FileImporter) getKeyRangeForFiles( files []*backuppb.File, @@ -495,20 +519,12 @@ func (importer *FileImporter) getKeyRangeForFiles( start, end []byte err error ) - + getRangeFn := getKeyRangeByMode(importer.kvMode) for _, f := range files { - if importer.kvMode == Raw { - start, end = f.GetStartKey(), f.GetEndKey() - } else if importer.kvMode == Txn { - start = codec.EncodeBytes([]byte{}, f.GetStartKey()) - end = codec.EncodeBytes([]byte{}, f.GetEndKey()) - } else { - start, end, err = GetRewriteRawKeys(f, rewriteRules) - if err != nil { - return nil, nil, errors.Trace(err) - } + start, end, err = getRangeFn(f, rewriteRules) + if err != nil { + return nil, nil, errors.Trace(err) } - if len(startKey) == 0 || bytes.Compare(start, startKey) < 0 { startKey = start } diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index 4af813e39a4fc..adcaed80a1514 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -16,6 +16,65 @@ import ( "github.com/stretchr/testify/require" ) +func TestGetKeyRangeByMode(t *testing.T) { + file := &backuppb.File{ + Name: "file_write.sst", + StartKey: []byte("t1a"), + EndKey: []byte("t1ccc"), + } + endFile := &backuppb.File{ + Name: "file_write.sst", + StartKey: []byte("t1a"), + EndKey: []byte(""), + } + rule := &RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: []byte("t1"), + NewKeyPrefix: []byte("t2"), + }, + }, + } + // raw kv + testRawFn := getKeyRangeByMode(Raw) + start, end, err := testRawFn(file, rule) + require.NoError(t, err) + require.Equal(t, []byte("t1a"), start) + require.Equal(t, []byte("t1ccc"), end) + + start, end, err = testRawFn(endFile, rule) + require.NoError(t, err) + require.Equal(t, []byte("t1a"), start) + require.Equal(t, []byte(""), end) + + // txn kv: the keys must be encoded. + testTxnFn := getKeyRangeByMode(Txn) + start, end, err = testTxnFn(file, rule) + require.NoError(t, err) + require.Equal(t, codec.EncodeBytes(nil, []byte("t1a")), start) + require.Equal(t, codec.EncodeBytes(nil, []byte("t1ccc")), end) + + start, end, err = testTxnFn(endFile, rule) + require.NoError(t, err) + require.Equal(t, codec.EncodeBytes(nil, []byte("t1a")), start) + require.Equal(t, []byte(""), end) + + // normal kv: the keys must be encoded. + testFn := getKeyRangeByMode(TiDB) + start, end, err = testFn(file, rule) + require.NoError(t, err) + require.Equal(t, codec.EncodeBytes(nil, []byte("t2a")), start) + require.Equal(t, codec.EncodeBytes(nil, []byte("t2ccc")), end) + + // TODO maybe fix later + // current restore does not support rewrite empty endkey. + // because backup guarantees that the end key is not empty. + // start, end, err = testFn(endFile, rule) + // require.NoError(t, err) + // require.Equal(t, codec.EncodeBytes(nil, []byte("t2a")), start) + // require.Equal(t, []byte(""), end) +} + func TestParseQuoteName(t *testing.T) { schema, table := ParseQuoteName("`a`.`b`") require.Equal(t, "a", schema)