Skip to content

Commit

Permalink
Rebase keyspace br to release-6.4 (pingcap#154)
Browse files Browse the repository at this point in the history
* BR adaption for keyspace feature (pingcap#38)

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Co-authored-by: ystaticy <y_static_y@sina.com>
Co-authored-by: disksing <i@disksing.com>

* build br image (pingcap#62)

* build br image

Signed-off-by: disksing <i@disksing.com>

* BR: Serverless Compatible BR (pingcap#64)

* disable remove scheduler and other pd related modifications when keyspace is set for BR

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* make tidb br forward compatible (pingcap#84)

* make tidb br forward compatible

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* fix bug,when tables>0 and file=0

Signed-off-by: ystaticy <y_static_y@sina.com>

* go mod tidy

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Signed-off-by: ystaticy <y_static_y@sina.com>
Co-authored-by: ystaticy <y_static_y@sina.com>

* Cse BR restore only download on leader (pingcap#120)

* cse br restore,only download in region leader

* support config

Signed-off-by: ystaticy <y_static_y@sina.com>

* cse br restore,only download in region leader

Signed-off-by: ystaticy <y_static_y@sina.com>

* cse br restore,only download in region leader

Signed-off-by: ystaticy <y_static_y@sina.com>

* fix logs

Signed-off-by: ystaticy <y_static_y@sina.com>

* fix logs

Signed-off-by: ystaticy <y_static_y@sina.com>

* fix logs

Signed-off-by: ystaticy <y_static_y@sina.com>

* remove unused code

Signed-off-by: ystaticy <y_static_y@sina.com>

* fix imports

Signed-off-by: ystaticy <y_static_y@sina.com>

* fix comments

Signed-off-by: ystaticy <y_static_y@sina.com>

* fix config of leaderdownload

Signed-off-by: ystaticy <y_static_y@sina.com>

* fix config of leaderdownload

Signed-off-by: ystaticy <y_static_y@sina.com>

* if met not leader error.return error immediately.

Signed-off-by: ystaticy <y_static_y@sina.com>

Signed-off-by: ystaticy <y_static_y@sina.com>

* support skip split option for small dataset (pingcap#122)

* support skip split option for small dataset

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* disable SplitTable only when skip split

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* rename api version of FileImporter

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Signed-off-by: disksing <i@disksing.com>
Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>
Signed-off-by: ystaticy <y_static_y@sina.com>
Co-authored-by: ystaticy <y_static_y@sina.com>
Co-authored-by: disksing <i@disksing.com>
Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>
  • Loading branch information
4 people authored Dec 15, 2022
1 parent dcfc673 commit ca134e7
Show file tree
Hide file tree
Showing 19 changed files with 330 additions and 90 deletions.
46 changes: 37 additions & 9 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type ExecutorBuilder struct {
oldTable *metautil.Table

concurrency uint

oldKeyspace []byte
newKeyspace []byte
}

// NewExecutorBuilder returns a new executor builder.
Expand All @@ -51,9 +54,26 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder {
return builder
}

func (builder *ExecutorBuilder) SetOldKeyspace(keyspace []byte) *ExecutorBuilder {
builder.oldKeyspace = keyspace
return builder
}

func (builder *ExecutorBuilder) SetNewKeyspace(keyspace []byte) *ExecutorBuilder {
builder.newKeyspace = keyspace
return builder
}

// Build builds a checksum executor.
func (builder *ExecutorBuilder) Build() (*Executor, error) {
reqs, err := buildChecksumRequest(builder.table, builder.oldTable, builder.ts, builder.concurrency)
reqs, err := buildChecksumRequest(
builder.table,
builder.oldTable,
builder.ts,
builder.concurrency,
builder.oldKeyspace,
builder.newKeyspace,
)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -65,6 +85,8 @@ func buildChecksumRequest(
oldTable *metautil.Table,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) ([]*kv.Request, error) {
var partDefs []model.PartitionDefinition
if part := newTable.Partition; part != nil {
Expand All @@ -76,7 +98,7 @@ func buildChecksumRequest(
if oldTable != nil {
oldTableID = oldTable.Info.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency)
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -91,7 +113,7 @@ func buildChecksumRequest(
}
}
}
rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency)
rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -108,9 +130,11 @@ func buildRequest(
oldTableID int64,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) ([]*kv.Request, error) {
reqs := make([]*kv.Request, 0)
req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency)
req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -139,7 +163,7 @@ func buildRequest(
}
}
req, err = buildIndexRequest(
tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency)
tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -156,12 +180,14 @@ func buildTableRequest(
oldTableID int64,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) (*kv.Request, error) {
var rule *tipb.ChecksumRewriteRule
if oldTable != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: tablecodec.GenTableRecordPrefix(oldTableID),
NewPrefix: tablecodec.GenTableRecordPrefix(tableID),
OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.GenTableRecordPrefix(oldTableID)...),
NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.GenTableRecordPrefix(tableID)...),
}
}

Expand Down Expand Up @@ -195,12 +221,14 @@ func buildIndexRequest(
oldIndexInfo *model.IndexInfo,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) (*kv.Request, error) {
var rule *tipb.ChecksumRewriteRule
if oldIndexInfo != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID),
NewPrefix: tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID),
OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)...),
NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)...),
}
}
checksum := &tipb.ChecksumRequest{
Expand Down
13 changes: 8 additions & 5 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -174,7 +175,8 @@ func NewMgr(
}

// Disable GC because TiDB enables GC already.
storage, err := g.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddrs), securityOption)
path := fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", pdAddrs, keyspace.GetKeyspaceNameBySettings())
storage, err := g.Open(path, securityOption)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -195,10 +197,11 @@ func NewMgr(
// we will keep this check until 7.0, which allow the breaking changes.
// NOTE: must call it after domain created!
// FIXME: remove this check in v7.0
err = version.CheckClusterVersion(ctx, controller.GetPDClient(), version.CheckVersionForDDL)
if err != nil {
return nil, errors.Annotate(err, "unable to check cluster version for ddl")
}
// TODO: upgrade cloud storage engine to v6.2.0 or later version
// err = version.CheckClusterVersion(ctx, controller.GetPDClient(), version.CheckVersionForDDL)
// if err != nil {
// return nil, errors.Annotate(err, "unable to check cluster version for ddl")
// }
}

mgr := &Mgr{
Expand Down
11 changes: 8 additions & 3 deletions br/pkg/metautil/metafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"
"github.com/tikv/client-go/v2/tikv"
"sync"
"time"

Expand Down Expand Up @@ -170,12 +171,12 @@ type MetaReader struct {

// NewMetaReader creates MetaReader.
func NewMetaReader(
backpMeta *backuppb.BackupMeta,
backupMeta *backuppb.BackupMeta,
storage storage.ExternalStorage,
cipher *backuppb.CipherInfo) *MetaReader {
return &MetaReader{
storage: storage,
backupMeta: backpMeta,
backupMeta: backupMeta,
cipher: cipher,
}
}
Expand Down Expand Up @@ -290,7 +291,11 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T
// put all files in memory due to https://github.com/pingcap/br/issues/705
fileMap := make(map[int64][]*backuppb.File)
outputFn := func(file *backuppb.File) {
tableID := tablecodec.DecodeTableID(file.GetStartKey())
_, start, err := tikv.DecodeKey(file.GetStartKey(), reader.backupMeta.ApiVersion)
if err != nil {
log.Panic("decode key error", zap.Error(err))
}
tableID := tablecodec.DecodeTableID(start)
if tableID == 0 {
log.Panic("tableID must not equal to 0", logutil.File(file))
}
Expand Down
47 changes: 39 additions & 8 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/backup"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/pingcap/tidb/config"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -165,6 +167,11 @@ type Client struct {

// the successfully preallocated table IDs.
preallocedTableIDs *tidalloc.PreallocIDs
// Target keyspace's name for the data restoration.
keyspaceName string

// leaderdown is true means it's just download on leader
leaderDownload bool
}

// NewRestoreClient returns a new RestoreClient.
Expand Down Expand Up @@ -251,6 +258,11 @@ func (rc *Client) allocTableIDs(ctx context.Context, tables []*metautil.Table) e

// SetPlacementPolicyMode to policy mode.
func (rc *Client) SetPlacementPolicyMode(withPlacementPolicy string) {
if rc.IsKeyspaceMode() {
log.Info("ignore placement policy when keyspaceName is set", zap.String("mode", rc.policyMode))
rc.policyMode = ignorePlacementPolicyMode
return
}
switch strings.ToUpper(withPlacementPolicy) {
case strictPlacementPolicyMode:
rc.policyMode = strictPlacementPolicyMode
Expand Down Expand Up @@ -342,7 +354,7 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke
func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool) {
metaClient := split.NewSplitClient(rc.pdClient, rc.tlsConf, isRawKvMode)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, rc.backupMeta.ApiVersion)
}

func (rc *Client) SetRawKVClient(c *RawKVBatchClient) {
Expand Down Expand Up @@ -866,7 +878,7 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma
}
})
if err != nil {
log.Error("create tables fail")
log.Error("create tables fail", zap.Error(err))
return err
}
for _, ct := range cts {
Expand Down Expand Up @@ -1006,7 +1018,7 @@ func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient,
rc.SetRateLimit(42)
rc.SetConcurrency(concurrency)
rc.hasSpeedLimited = false
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false)
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, kvrpcpb.APIVersion_V1)
return rc.setSpeedLimit(ctx, rc.rateLimit)
}

Expand Down Expand Up @@ -1134,7 +1146,7 @@ func (rc *Client) RestoreSSTFiles(
zap.Duration("take", time.Since(fileStart)))
updateCh.Inc()
}()
return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher, rc.backupMeta.ApiVersion)
return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion(), rc.leaderDownload)
})
}

Expand Down Expand Up @@ -1175,7 +1187,7 @@ func (rc *Client) RestoreRaw(
rc.workerPool.ApplyOnErrorGroup(eg,
func() error {
defer updateCh.Inc()
return rc.fileImporter.ImportSSTFiles(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher, rc.backupMeta.ApiVersion)
return rc.fileImporter.ImportSSTFiles(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher, rc.backupMeta.ApiVersion, rc.leaderDownload)
})
}
if err := eg.Wait(); err != nil {
Expand Down Expand Up @@ -1386,6 +1398,8 @@ func (rc *Client) execChecksum(
exe, err := checksum.NewExecutorBuilder(tbl.Table, startTS).
SetOldTable(tbl.OldTable).
SetConcurrency(concurrency).
SetOldKeyspace(tbl.RewriteRule.OldKeyspace).
SetNewKeyspace(tbl.RewriteRule.NewKeyspace).
Build()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1501,8 +1515,10 @@ func (rc *Client) ResetRestoreLabels(ctx context.Context) error {
}

// SetupPlacementRules sets rules for the tables' regions.
// This is only performed when using Online Restore mode with at least one restore stores.
// This is also skipped when keyspaceName is set.
func (rc *Client) SetupPlacementRules(ctx context.Context, tables []*model.TableInfo) error {
if !rc.isOnline || len(rc.restoreStores) == 0 {
if !rc.isOnline || len(rc.restoreStores) == 0 || rc.IsKeyspaceMode() {
return nil
}
log.Info("start setting placement rules")
Expand Down Expand Up @@ -1532,7 +1548,7 @@ func (rc *Client) SetupPlacementRules(ctx context.Context, tables []*model.Table

// WaitPlacementSchedule waits PD to move tables to restore stores.
func (rc *Client) WaitPlacementSchedule(ctx context.Context, tables []*model.TableInfo) error {
if !rc.isOnline || len(rc.restoreStores) == 0 {
if !rc.isOnline || len(rc.restoreStores) == 0 || rc.IsKeyspaceMode() {
return nil
}
log.Info("start waiting placement schedule")
Expand Down Expand Up @@ -1592,7 +1608,7 @@ func (rc *Client) checkRange(ctx context.Context, start, end []byte) (bool, stri

// ResetPlacementRules removes placement rules for tables.
func (rc *Client) ResetPlacementRules(ctx context.Context, tables []*model.TableInfo) error {
if !rc.isOnline || len(rc.restoreStores) == 0 {
if !rc.isOnline || len(rc.restoreStores) == 0 || rc.IsKeyspaceMode() {
return nil
}
log.Info("start reseting placement rules")
Expand Down Expand Up @@ -2464,6 +2480,21 @@ func (rc *Client) SetWithSysTable(withSysTable bool) {
rc.withSysTable = withSysTable
}

// SetKeyspaceName set the keyspace name for the restore client.
func (rc *Client) SetKeyspaceName(keyspaceName string) {
rc.keyspaceName = keyspaceName
}

// SetLeaderDownload set whether just download on leader.
func (rc *Client) SetLeaderDownload(leaderDownload bool) {
rc.leaderDownload = leaderDownload
}

// IsKeyspaceMode indicates whether BR is restoring a specific keyspace's data.
func (rc *Client) IsKeyspaceMode() bool {
return !keyspace.IsKeyspaceNameEmpty(rc.keyspaceName)
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down
Loading

0 comments on commit ca134e7

Please sign in to comment.