Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
Signed-off-by: wjhuang2016 <huangwenjun1997@gmail.com>
  • Loading branch information
wjhuang2016 committed Apr 13, 2023
2 parents 292266d + 2002ca1 commit d4476cb
Show file tree
Hide file tree
Showing 133 changed files with 2,959 additions and 950 deletions.
3 changes: 2 additions & 1 deletion br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ func setPDConfigCommand() *cobra.Command {
return errors.Trace(err)
}

mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements, false, conn.NormalVersionChecker)
mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg),
cfg.CheckRequirements, false, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
7 changes: 4 additions & 3 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ func newStreamCheckCommand() *cobra.Command {

func newStreamAdvancerCommand() *cobra.Command {
command := &cobra.Command{
Use: "advancer",
Short: "Start a central worker for advancing the checkpoint. (only for debuging, this subcommand should be integrated to TiDB)",
Args: cobra.NoArgs,
Use: "advancer",
Short: "Start a central worker for advancing the checkpoint. " +
"(only for debuging, this subcommand should be integrated to TiDB)",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return streamCommand(cmd, task.StreamCtl)
},
Expand Down
27 changes: 18 additions & 9 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ type CheckpointRunner struct {
}

// only for test
func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer) (*CheckpointRunner, error) {
func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage,
cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer) (*CheckpointRunner, error) {
runner := &CheckpointRunner{
meta: make(map[string]*RangeGroups),

Expand All @@ -269,7 +270,8 @@ func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalS
return runner, nil
}

func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, timer GlobalTimer) (*CheckpointRunner, error) {
func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage,
cipher *backuppb.CipherInfo, timer GlobalTimer) (*CheckpointRunner, error) {
runner := &CheckpointRunner{
meta: make(map[string]*RangeGroups),

Expand All @@ -293,7 +295,8 @@ func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage,
return runner, nil
}

func (r *CheckpointRunner) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64, totalKvs uint64, totalBytes uint64, timeCost float64) error {
func (r *CheckpointRunner) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64,
totalKvs uint64, totalBytes uint64, timeCost float64) error {
return r.checksumRunner.FlushChecksum(ctx, r.storage, tableID, crc64xor, totalKvs, totalBytes, timeCost)
}

Expand Down Expand Up @@ -405,7 +408,8 @@ func (r *CheckpointRunner) sendError(err error) {
r.checksumRunner.RecordError(err)
}

func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDurationForFlush, tickDurationForLock time.Duration) {
func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDurationForFlush,
tickDurationForLock time.Duration) {
r.wg.Add(1)
checkpointLoop := func(ctx context.Context) {
defer r.wg.Done()
Expand Down Expand Up @@ -557,7 +561,8 @@ func (r *CheckpointRunner) flushLock(ctx context.Context, p int64) error {
LockId: r.lockId,
ExpireAt: p + lockTimeToLive.Milliseconds(),
}
log.Info("start to flush the checkpoint lock", zap.Int64("lock-at", p), zap.Int64("expire-at", lock.ExpireAt))
log.Info("start to flush the checkpoint lock", zap.Int64("lock-at", p),
zap.Int64("expire-at", lock.ExpireAt))
data, err := json.Marshal(lock)
if err != nil {
return errors.Trace(err)
Expand All @@ -584,12 +589,15 @@ func (r *CheckpointRunner) checkLockFile(ctx context.Context, now int64) error {
"Please check whether the BR is running. If not, you can retry.", lock.LockId, r.lockId)
}
if lock.LockId == r.lockId {
log.Warn("The lock has expired.", zap.Int64("expire-at(ms)", lock.ExpireAt), zap.Int64("now(ms)", now))
log.Warn("The lock has expired.",
zap.Int64("expire-at(ms)", lock.ExpireAt), zap.Int64("now(ms)", now))
}
} else if lock.LockId != r.lockId {
return errors.Errorf("The existing lock will expire in %d seconds. "+
"There may be another BR(%d) running. If not, you can wait for the lock to expire, or delete the file `%s%s` manually.",
(lock.ExpireAt-now)/1000, lock.LockId, strings.TrimRight(r.storage.URI(), "/"), CheckpointLockPath)
"There may be another BR(%d) running. If not, you can wait for the lock to expire, "+
"or delete the file `%s%s` manually.",
(lock.ExpireAt-now)/1000, lock.LockId,
strings.TrimRight(r.storage.URI(), "/"), CheckpointLockPath)
}

return nil
Expand Down Expand Up @@ -635,7 +643,8 @@ func (r *CheckpointRunner) initialLock(ctx context.Context) error {

// walk the whole checkpoint range files and retrieve the metadatat of backed up ranges
// and return the total time cost in the past executions
func WalkCheckpointFile(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, fn func(groupKey string, rg *rtree.Range)) (time.Duration, error) {
func WalkCheckpointFile(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo,
fn func(groupKey string, rg *rtree.Range)) (time.Duration, error) {
// records the total time cost in the past executions
var pastDureTime time.Duration = 0
err := s.WalkDir(ctx, &storage.WalkOption{SubDir: CheckpointDataDir}, func(path string, size int64) error {
Expand Down
9 changes: 6 additions & 3 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ func buildIndexRequest(
var rule *tipb.ChecksumRewriteRule
if oldIndexInfo != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)...),
NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)...),
OldPrefix: append(append([]byte{}, oldKeyspace...),
tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)...),
NewPrefix: append(append([]byte{}, newKeyspace...),
tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)...),
}
}
checksum := &tipb.ChecksumRequest{
Expand Down Expand Up @@ -332,7 +334,8 @@ func (exec *Executor) Execute(
updateFn func(),
) (*tipb.ChecksumResponse, error) {
checksumResp := &tipb.ChecksumResponse{}
checksumBackoffer := utils.InitialRetryState(utils.ChecksumRetryTime, utils.ChecksumWaitInterval, utils.ChecksumMaxWaitInterval)
checksumBackoffer := utils.InitialRetryState(utils.ChecksumRetryTime,
utils.ChecksumWaitInterval, utils.ChecksumMaxWaitInterval)
for _, req := range exec.reqs {
// Pointer to SessionVars.Killed
// Killed is a flag to indicate that this query is killed.
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func (mgr *Mgr) GetTS(ctx context.Context) (uint64, error) {
return oracle.ComposeTS(p, l), nil
}

// GetMergeRegionSizeAndCount returns the tikv config `coprocessor.region-split-size` and `coprocessor.region-split-key`.
// GetMergeRegionSizeAndCount returns the tikv config
// `coprocessor.region-split-size` and `coprocessor.region-split-key`.
// returns the default config when failed.
func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Client) (uint64, uint64) {
regionSplitSize := DefaultMergeRegionSizeBytes
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type Session interface {
Execute(ctx context.Context, sql string) error
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo,
cs ...ddl.CreateTableWithInfoConfigurier) error
CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
Close()
GetGlobalVariable(name string) (string, error)
Expand All @@ -52,7 +53,8 @@ type Session interface {

// BatchCreateTableSession is an interface to batch create table parallelly
type BatchCreateTableSession interface {
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo,
cs ...ddl.CreateTableWithInfoConfigurier) error
}

// Progress is an interface recording the current execution progress.
Expand Down
20 changes: 13 additions & 7 deletions br/pkg/glue/progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ func (ops ConsoleOperations) StartProgressBar(title string, total int, extraFiel
return ops.startProgressBarOverTTY(title, total, extraFields...)
}

func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int, extraFields ...ExtraField) ProgressWaiter {
func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int,
extraFields ...ExtraField) ProgressWaiter {
return noOPWaiter{utils.StartProgress(context.TODO(), title, int64(total), true, nil)}
}

func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, extraFields ...ExtraField) ProgressWaiter {
func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int,
extraFields ...ExtraField) ProgressWaiter {
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
bar := adjustTotal(pb, title, total, extraFields...)

Expand All @@ -142,7 +144,8 @@ func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...
greenTitle := color.GreenString(title)
return pb.New(int64(total),
// Play as if the old BR style.
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").
Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarFillerMiddleware(func(bf mpb.BarFiller) mpb.BarFiller {
return mpb.BarFillerFunc(func(w io.Writer, reqWidth int, stat decor.Statistics) {
if stat.Aborted || stat.Completed {
Expand All @@ -151,19 +154,22 @@ func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...
bf.Fill(w, reqWidth, stat)
})
}),
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle), fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"), printFinalMessage(extraFields))), color.RedString("ABORTED"))),
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle),
fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"),
printFinalMessage(extraFields))), color.RedString("ABORTED"))),
)
}

var (
spinnerDoneText string = fmt.Sprintf("... %s", color.GreenString("DONE"))
spinnerDoneText = fmt.Sprintf("... %s", color.GreenString("DONE"))
)

func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
return pb.New(int64(total),
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(title)),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText), color.RedString("ABORTED"))),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText),
color.RedString("ABORTED"))),
)
}
26 changes: 16 additions & 10 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...
defer rs.Close()
c := rs.NewChunk(nil)
if err := rs.Next(ctx, c); err != nil {
log.Warn("Error during draining result of internal sql.", logutil.Redact(zap.String("sql", sql)), logutil.ShortError(err))
log.Warn("Error during draining result of internal sql.",
logutil.Redact(zap.String("sql", sql)), logutil.ShortError(err))
return nil
}
}
Expand Down Expand Up @@ -224,11 +225,12 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr,
infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(gs.se).DDL()

if err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) {
err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...)
if kv.ErrEntryTooLarge.Equal(err) {
log.Info("entry too large, split batch create table", zap.Int("num table", len(infos)))
if len(infos) == 1 {
return err
Expand All @@ -248,7 +250,8 @@ func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, infos []*model.
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
func (gs *tidbSession) CreateTables(_ context.Context,
tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var dbName model.CIStr

// Disable foreign key check when batch create tables.
Expand Down Expand Up @@ -289,7 +292,8 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
}

// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
func (gs *tidbSession) CreateTable(_ context.Context, dbName model.CIStr,
table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateTable(table)
if err != nil {
Expand Down Expand Up @@ -385,25 +389,27 @@ func (s *mockSession) ExecuteInternal(ctx context.Context, sql string, args ...i
}

// CreateDatabase implements glue.Session.
func (s *mockSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
func (*mockSession) CreateDatabase(_ context.Context, _ *model.DBInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreatePlacementPolicy implements glue.Session.
func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error {
func (*mockSession) CreatePlacementPolicy(_ context.Context, _ *model.PolicyInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTables implements glue.BatchCreateTableSession.
func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
func (*mockSession) CreateTables(_ context.Context, _ map[string][]*model.TableInfo,
_ ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTable implements glue.Session.
func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
func (*mockSession) CreateTable(_ context.Context, _ model.CIStr,
_ *model.TableInfo, _ ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}
Expand Down
19 changes: 13 additions & 6 deletions br/pkg/gluetidb/glue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ func TestSplitBatchCreateTableWithTableId(t *testing.T) {
}))
require.NoError(t, err)

tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued1'").Check(testkit.Rows("124"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued2'").Check(testkit.Rows("125"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued1'").
Check(testkit.Rows("124"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued2'").
Check(testkit.Rows("125"))
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers)

// allocate new table id verification
Expand All @@ -90,7 +92,9 @@ func TestSplitBatchCreateTableWithTableId(t *testing.T) {
}))
require.NoError(t, err)

idGen, ok := tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_new'").Rows()[0][0].(string)
idGen, ok := tk.MustQuery(
"select tidb_table_id from information_schema.tables where table_name = 'table_id_new'").
Rows()[0][0].(string)
require.True(t, ok)
idGenNum, err := strconv.ParseInt(idGen, 10, 64)
require.NoError(t, err)
Expand Down Expand Up @@ -166,9 +170,12 @@ func TestSplitBatchCreateTable(t *testing.T) {
require.Equal(t, "public", job3[4])

// check reused table id
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'").Check(testkit.Rows("1234"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'").Check(testkit.Rows("1235"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'").Check(testkit.Rows("1236"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'").
Check(testkit.Rows("1234"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'").
Check(testkit.Rows("1235"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'").
Check(testkit.Rows("1236"))

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge"))
}
Expand Down
Loading

0 comments on commit d4476cb

Please sign in to comment.