diff --git a/br/pkg/gluetidb/BUILD.bazel b/br/pkg/gluetidb/BUILD.bazel index 979f05b4e5b48..4fdfdc4113039 100644 --- a/br/pkg/gluetidb/BUILD.bazel +++ b/br/pkg/gluetidb/BUILD.bazel @@ -14,9 +14,7 @@ go_library( "//pkg/domain", "//pkg/executor", "//pkg/kv", - "//pkg/meta/autoid", "//pkg/parser/model", - "//pkg/parser/mysql", "//pkg/session", "//pkg/sessionctx", "@com_github_pingcap_errors//:errors", @@ -32,17 +30,11 @@ go_test( srcs = ["glue_test.go"], embed = [":gluetidb"], flaky = True, - shard_count = 4, deps = [ "//br/pkg/glue", - "//pkg/ddl", - "//pkg/kv", - "//pkg/meta", "//pkg/parser/model", - "//pkg/sessionctx", "//pkg/testkit", "//pkg/types", - "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", ], ) diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index da42b1b1d9b71..5c8f33b7d68dd 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -3,9 +3,7 @@ package gluetidb import ( - "bytes" "context" - "strings" "time" "github.com/pingcap/errors" @@ -18,9 +16,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/executor" "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx" pd "github.com/tikv/pd/client" @@ -33,11 +29,7 @@ var ( _ glue.Glue = Glue{} ) -const ( - defaultCapOfCreateTable = 512 - defaultCapOfCreateDatabase = 64 - brComment = `/*from(br)*/` -) +const brComment = `/*from(br)*/` // New makes a new tidb glue. func New() Glue { @@ -206,17 +198,7 @@ func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ... // CreateDatabase implements glue.Session. func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { - d := domain.GetDomain(gs.se).DDL() - query, err := gs.showCreateDatabase(schema) - if err != nil { - return errors.Trace(err) - } - gs.se.SetValue(sessionctx.QueryString, query) - schema = schema.Clone() - if len(schema.Charset) == 0 { - schema.Charset = mysql.DefaultCharset - } - return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore) + return errors.Trace(executor.BRIECreateDatabase(gs.se, schema, brComment)) } // CreatePlacementPolicy implements glue.Session. @@ -227,95 +209,16 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model. return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore) } -// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB. -// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation -// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all. -func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, - infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { - var err error - d := domain.GetDomain(gs.se).DDL() - err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...) - if kv.ErrEntryTooLarge.Equal(err) { - log.Info("entry too large, split batch create table", zap.Int("num table", len(infos))) - if len(infos) == 1 { - return err - } - mid := len(infos) / 2 - err = gs.SplitBatchCreateTable(schema, infos[:mid], cs...) - if err != nil { - return err - } - err = gs.SplitBatchCreateTable(schema, infos[mid:], cs...) - if err != nil { - return err - } - return nil - } - return err -} - // CreateTables implements glue.BatchCreateTableSession. func (gs *tidbSession) CreateTables(_ context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { - var dbName model.CIStr - - // Disable foreign key check when batch create tables. - gs.se.GetSessionVars().ForeignKeyChecks = false - for db, tablesInDB := range tables { - dbName = model.NewCIStr(db) - queryBuilder := strings.Builder{} - cloneTables := make([]*model.TableInfo, 0, len(tablesInDB)) - for _, table := range tablesInDB { - query, err := gs.showCreateTable(table) - if err != nil { - return errors.Trace(err) - } - - queryBuilder.WriteString(query) - queryBuilder.WriteString(";") - - table = table.Clone() - // Clone() does not clone partitions yet :( - if table.Partition != nil { - newPartition := *table.Partition - newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...) - table.Partition = &newPartition - } - cloneTables = append(cloneTables, table) - } - gs.se.SetValue(sessionctx.QueryString, queryBuilder.String()) - if err := gs.SplitBatchCreateTable(dbName, cloneTables, cs...); err != nil { - //It is possible to failure when TiDB does not support model.ActionCreateTables. - //In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob, - //we fall back to old way that creating table one by one - log.Warn("batch create table from tidb failure", zap.Error(err)) - return err - } - } - - return nil + return errors.Trace(executor.BRIECreateTables(gs.se, tables, brComment, cs...)) } // CreateTable implements glue.Session. func (gs *tidbSession) CreateTable(_ context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { - d := domain.GetDomain(gs.se).DDL() - query, err := gs.showCreateTable(table) - if err != nil { - return errors.Trace(err) - } - gs.se.SetValue(sessionctx.QueryString, query) - // Disable foreign key check when batch create tables. - gs.se.GetSessionVars().ForeignKeyChecks = false - // Clone() does not clone partitions yet :( - table = table.Clone() - if table.Partition != nil { - newPartition := *table.Partition - newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...) - table.Partition = &newPartition - } - - return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...) + return errors.Trace(executor.BRIECreateTable(gs.se, dbName, table, brComment, cs...)) } // Close implements glue.Session. @@ -328,30 +231,6 @@ func (gs *tidbSession) GetGlobalVariable(name string) (string, error) { return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name) } -// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo. -func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) { - table := tbl.Clone() - table.AutoIncID = 0 - result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable)) - // this can never fail. - _, _ = result.WriteString(brComment) - if err := executor.ConstructResultOfShowCreateTable(gs.se, tbl, autoid.Allocators{}, result); err != nil { - return "", errors.Trace(err) - } - return result.String(), nil -} - -// showCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo. -func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) { - result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase)) - // this can never fail. - _, _ = result.WriteString(brComment) - if err := executor.ConstructResultOfShowCreateDatabase(gs.se, db, true, result); err != nil { - return "", errors.Trace(err) - } - return result.String(), nil -} - func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string { return executor.ConstructResultOfShowCreatePlacementPolicy(policy) } diff --git a/br/pkg/gluetidb/glue_test.go b/br/pkg/gluetidb/glue_test.go index 8c4b3420dc669..1813a7b357b16 100644 --- a/br/pkg/gluetidb/glue_test.go +++ b/br/pkg/gluetidb/glue_test.go @@ -16,206 +16,15 @@ package gluetidb import ( "context" - "strconv" "testing" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/glue" - "github.com/pingcap/tidb/pkg/ddl" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/types" "github.com/stretchr/testify/require" ) -// batch create table with table id reused -func TestSplitBatchCreateTableWithTableId(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("drop table if exists table_id_resued1") - tk.MustExec("drop table if exists table_id_resued2") - tk.MustExec("drop table if exists table_id_new") - - d := dom.DDL() - require.NotNil(t, d) - - infos1 := []*model.TableInfo{} - infos1 = append(infos1, &model.TableInfo{ - ID: 124, - Name: model.NewCIStr("table_id_resued1"), - }) - infos1 = append(infos1, &model.TableInfo{ - ID: 125, - Name: model.NewCIStr("table_id_resued2"), - }) - - se := &tidbSession{se: tk.Session()} - - // keep/reused table id verification - tk.Session().SetValue(sessionctx.QueryString, "skip") - err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos1, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return false - })) - require.NoError(t, err) - - tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued1'"). - Check(testkit.Rows("124")) - tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued2'"). - Check(testkit.Rows("125")) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) - - // allocate new table id verification - // query the global id - var id int64 - err = kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error { - m := meta.NewMeta(txn) - var err error - id, err = m.GenGlobalID() - return err - }) - - require.NoError(t, err) - - infos2 := []*model.TableInfo{} - infos2 = append(infos2, &model.TableInfo{ - ID: 124, - Name: model.NewCIStr("table_id_new"), - }) - - tk.Session().SetValue(sessionctx.QueryString, "skip") - err = se.SplitBatchCreateTable(model.NewCIStr("test"), infos2, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return true - })) - require.NoError(t, err) - - idGen, ok := tk.MustQuery( - "select tidb_table_id from information_schema.tables where table_name = 'table_id_new'"). - Rows()[0][0].(string) - require.True(t, ok) - idGenNum, err := strconv.ParseInt(idGen, 10, 64) - require.NoError(t, err) - require.Greater(t, idGenNum, id) - - // a empty table info with len(info3) = 0 - infos3 := []*model.TableInfo{} - - err = se.SplitBatchCreateTable(model.NewCIStr("test"), infos3, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return false - })) - require.NoError(t, err) -} - -// batch create table with table id reused -func TestSplitBatchCreateTable(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("drop table if exists table_1") - tk.MustExec("drop table if exists table_2") - tk.MustExec("drop table if exists table_3") - - d := dom.DDL() - require.NotNil(t, d) - - infos := []*model.TableInfo{} - infos = append(infos, &model.TableInfo{ - ID: 1234, - Name: model.NewCIStr("tables_1"), - }) - infos = append(infos, &model.TableInfo{ - ID: 1235, - Name: model.NewCIStr("tables_2"), - }) - infos = append(infos, &model.TableInfo{ - ID: 1236, - Name: model.NewCIStr("tables_3"), - }) - - se := &tidbSession{se: tk.Session()} - - // keep/reused table id verification - tk.Session().SetValue(sessionctx.QueryString, "skip") - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(1)")) - err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return false - })) - - require.NoError(t, err) - tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3")) - jobs := tk.MustQuery("admin show ddl jobs").Rows() - require.Greater(t, len(jobs), 3) - // check table_1 - job1 := jobs[0] - require.Equal(t, "test", job1[1]) - require.Equal(t, "tables_3", job1[2]) - require.Equal(t, "create tables", job1[3]) - require.Equal(t, "public", job1[4]) - - // check table_2 - job2 := jobs[1] - require.Equal(t, "test", job2[1]) - require.Equal(t, "tables_2", job2[2]) - require.Equal(t, "create tables", job2[3]) - require.Equal(t, "public", job2[4]) - - // check table_3 - job3 := jobs[2] - require.Equal(t, "test", job3[1]) - require.Equal(t, "tables_1", job3[2]) - require.Equal(t, "create tables", job3[3]) - require.Equal(t, "public", job3[4]) - - // check reused table id - tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'"). - Check(testkit.Rows("1234")) - tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'"). - Check(testkit.Rows("1235")) - tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'"). - Check(testkit.Rows("1236")) - - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge")) -} - -// batch create table with table id reused -func TestSplitBatchCreateTableFailWithEntryTooLarge(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("drop table if exists table_1") - tk.MustExec("drop table if exists table_2") - tk.MustExec("drop table if exists table_3") - - d := dom.DDL() - require.NotNil(t, d) - - infos := []*model.TableInfo{} - infos = append(infos, &model.TableInfo{ - Name: model.NewCIStr("tables_1"), - }) - infos = append(infos, &model.TableInfo{ - Name: model.NewCIStr("tables_2"), - }) - infos = append(infos, &model.TableInfo{ - Name: model.NewCIStr("tables_3"), - }) - - se := &tidbSession{se: tk.Session()} - - tk.Session().SetValue(sessionctx.QueryString, "skip") - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(0)")) - err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { - return true - })) - - require.True(t, kv.ErrEntryTooLarge.Equal(err)) - - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge")) -} - func TestTheSessionIsoation(t *testing.T) { req := require.New(t) store := testkit.CreateMockStore(t) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 7d351d12adb38..212f712b50c22 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -232,24 +232,22 @@ func (rc *Client) Init(g glue.Glue, store kv.Storage) error { rc.backupMeta = new(backuppb.BackupMeta) } - // Only in binary we can use multi-thread sessions to create tables. - // so use OwnStorage() to tell whether we are use binary or SQL. - if g.OwnsStorage() { - // Maybe allow user modify the DDL concurrency isn't necessary, - // because executing DDL is really I/O bound (or, algorithm bound?), - // and we cost most of time at waiting DDL jobs be enqueued. - // So these jobs won't be faster or slower when machine become faster or slower, - // hence make it a fixed value would be fine. - rc.dbPool, err = makeDBPool(defaultDDLConcurrency, func() (*DB, error) { - db, _, err := NewDB(g, store, rc.policyMode) - return db, err - }) - if err != nil { - log.Warn("create session pool failed, we will send DDLs only by created sessions", - zap.Error(err), - zap.Int("sessionCount", len(rc.dbPool)), - ) - } + // There are different ways to create session between in binary and in SQL. + // + // Maybe allow user modify the DDL concurrency isn't necessary, + // because executing DDL is really I/O bound (or, algorithm bound?), + // and we cost most of time at waiting DDL jobs be enqueued. + // So these jobs won't be faster or slower when machine become faster or slower, + // hence make it a fixed value would be fine. + rc.dbPool, err = makeDBPool(defaultDDLConcurrency, func() (*DB, error) { + db, _, err := NewDB(g, store, rc.policyMode) + return db, err + }) + if err != nil { + log.Warn("create session pool failed, we will send DDLs only by created sessions", + zap.Error(err), + zap.Int("sessionCount", len(rc.dbPool)), + ) } return errors.Trace(err) } @@ -489,12 +487,21 @@ func (rc *Client) GetRewriteMode() RewriteMode { return rc.rewriteMode } -// Close a client. -func (rc *Client) Close() { +func (rc *Client) closeConn() { // rc.db can be nil in raw kv mode. if rc.db != nil { rc.db.Close() } + for _, db := range rc.dbPool { + db.Close() + } +} + +// Close a client. +func (rc *Client) Close() { + // close the connection, and it must be succeed when in SQL mode. + rc.closeConn() + if rc.rawKVClient != nil { rc.rawKVClient.Close() } diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index e554f92d21afd..5304b47a6fb72 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -281,7 +281,9 @@ func makeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) { if e != nil { return dbPool, e } - dbPool = append(dbPool, db) + if db != nil { + dbPool = append(dbPool, db) + } } return dbPool, nil } diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index ff85677e5182f..11f17ffce8e7f 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "batch_point_get.go", "bind.go", "brie.go", + "brie_utils.go", "builder.go", "change.go", "checksum.go", @@ -293,6 +294,7 @@ go_test( "batch_point_get_test.go", "benchmark_test.go", "brie_test.go", + "brie_utils_test.go", "charset_test.go", "chunk_size_control_test.go", "cluster_table_test.go", diff --git a/pkg/executor/brie.go b/pkg/executor/brie.go index 5e2a06fe10fe8..480305ead0498 100644 --- a/pkg/executor/brie.go +++ b/pkg/executor/brie.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/format" "github.com/pingcap/tidb/pkg/parser/model" @@ -101,7 +100,7 @@ func (p *brieTaskProgress) Close() { p.lock.Lock() current := atomic.LoadInt64(&p.current) if current < p.total { - p.cmd = fmt.Sprintf("%s Cacneled", p.cmd) + p.cmd = fmt.Sprintf("%s Canceled", p.cmd) } atomic.StoreInt64(&p.current, p.total) p.lock.Unlock() @@ -564,7 +563,7 @@ func (e *BRIEExec) Next(ctx context.Context, req *chunk.Chunk) error { defer bq.releaseTask() e.info.execTime = types.CurrentTime(mysql.TypeDatetime) - glue := &tidbGlueSession{se: e.Ctx(), progress: progress, info: e.info} + glue := &tidbGlue{se: e.Ctx(), progress: progress, info: e.info} switch e.info.kind { case ast.BRIEKindBackup: @@ -632,25 +631,82 @@ func (e *ShowExec) fetchShowBRIE(kind ast.BRIEKind) error { return nil } -type tidbGlueSession struct { +type tidbGlue struct { + // the session context of the brie task se sessionctx.Context progress *brieTaskProgress info *brieTaskInfo } -// GetSessionCtx implements glue.Glue -func (gs *tidbGlueSession) GetSessionCtx() sessionctx.Context { - return gs.se -} - // GetDomain implements glue.Glue -func (gs *tidbGlueSession) GetDomain(_ kv.Storage) (*domain.Domain, error) { +func (gs *tidbGlue) GetDomain(_ kv.Storage) (*domain.Domain, error) { return domain.GetDomain(gs.se), nil } // CreateSession implements glue.Glue -func (gs *tidbGlueSession) CreateSession(_ kv.Storage) (glue.Session, error) { - return gs, nil +func (gs *tidbGlue) CreateSession(_ kv.Storage) (glue.Session, error) { + newSCtx, err := CreateSession(gs.se) + if err != nil { + return nil, err + } + return &tidbGlueSession{se: newSCtx}, nil +} + +// Open implements glue.Glue +func (gs *tidbGlue) Open(string, pd.SecurityOption) (kv.Storage, error) { + return gs.se.GetStore(), nil +} + +// OwnsStorage implements glue.Glue +func (*tidbGlue) OwnsStorage() bool { + return false +} + +// StartProgress implements glue.Glue +func (gs *tidbGlue) StartProgress(_ context.Context, cmdName string, total int64, _ bool) glue.Progress { + gs.progress.lock.Lock() + gs.progress.cmd = cmdName + gs.progress.total = total + atomic.StoreInt64(&gs.progress.current, 0) + gs.progress.lock.Unlock() + return gs.progress +} + +// Record implements glue.Glue +func (gs *tidbGlue) Record(name string, value uint64) { + switch name { + case "BackupTS": + gs.info.backupTS = value + case "RestoreTS": + gs.info.restoreTS = value + case "Size": + gs.info.archiveSize = value + } +} + +func (*tidbGlue) GetVersion() string { + return "TiDB\n" + printer.GetTiDBInfo() +} + +// UseOneShotSession implements glue.Glue +func (gs *tidbGlue) UseOneShotSession(_ kv.Storage, _ bool, fn func(se glue.Session) error) error { + // In SQL backup, we don't need to close domain, + // but need to create an new session. + newSCtx, err := CreateSession(gs.se) + if err != nil { + return err + } + glueSession := &tidbGlueSession{se: newSCtx} + defer func() { + CloseSession(newSCtx) + log.Info("one shot session from brie closed") + }() + return fn(glueSession) +} + +type tidbGlueSession struct { + // the session context of the brie task's subtask, such as `CREATE TABLE`. + se sessionctx.Context } // Execute implements glue.Session @@ -672,46 +728,24 @@ func (gs *tidbGlueSession) ExecuteInternal(ctx context.Context, sql string, args // CreateDatabase implements glue.Session func (gs *tidbGlueSession) CreateDatabase(_ context.Context, schema *model.DBInfo) error { - d := domain.GetDomain(gs.se).DDL() - // 512 is defaultCapOfCreateTable. - result := bytes.NewBuffer(make([]byte, 0, 512)) - if err := ConstructResultOfShowCreateDatabase(gs.se, schema, true, result); err != nil { - return err - } - gs.se.SetValue(sessionctx.QueryString, result.String()) - schema = schema.Clone() - if len(schema.Charset) == 0 { - schema.Charset = mysql.DefaultCharset - } - return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore) + return BRIECreateDatabase(gs.se, schema, "") } // CreateTable implements glue.Session func (gs *tidbGlueSession) CreateTable(_ context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { - d := domain.GetDomain(gs.se).DDL() - - // 512 is defaultCapOfCreateTable. - result := bytes.NewBuffer(make([]byte, 0, 512)) - if err := ConstructResultOfShowCreateTable(gs.se, table, autoid.Allocators{}, result); err != nil { - return err - } - gs.se.SetValue(sessionctx.QueryString, result.String()) - // Disable foreign key check when batch create tables. - gs.se.GetSessionVars().ForeignKeyChecks = false - - // Clone() does not clone partitions yet :( - table = table.Clone() - if table.Partition != nil { - newPartition := *table.Partition - newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...) - table.Partition = &newPartition - } + return BRIECreateTable(gs.se, dbName, table, "", cs...) +} - return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...) +// CreateTables implements glue.BatchCreateTableSession. +func (gs *tidbGlueSession) CreateTables(_ context.Context, + tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + return BRIECreateTables(gs.se, tables, "", cs...) } // CreatePlacementPolicy implements glue.Session func (gs *tidbGlueSession) CreatePlacementPolicy(_ context.Context, policy *model.PolicyInfo) error { + originQueryString := gs.se.Value(sessionctx.QueryString) + defer gs.se.SetValue(sessionctx.QueryString, originQueryString) gs.se.SetValue(sessionctx.QueryString, ConstructResultOfShowCreatePlacementPolicy(policy)) d := domain.GetDomain(gs.se).DDL() // the default behaviour is ignoring duplicated policy during restore. @@ -719,7 +753,8 @@ func (gs *tidbGlueSession) CreatePlacementPolicy(_ context.Context, policy *mode } // Close implements glue.Session -func (*tidbGlueSession) Close() { +func (gs *tidbGlueSession) Close() { + CloseSession(gs.se) } // GetGlobalVariables implements glue.Session. @@ -727,46 +762,9 @@ func (gs *tidbGlueSession) GetGlobalVariable(name string) (string, error) { return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name) } -// Open implements glue.Glue -func (gs *tidbGlueSession) Open(string, pd.SecurityOption) (kv.Storage, error) { - return gs.se.GetStore(), nil -} - -// OwnsStorage implements glue.Glue -func (*tidbGlueSession) OwnsStorage() bool { - return false -} - -// StartProgress implements glue.Glue -func (gs *tidbGlueSession) StartProgress(_ context.Context, cmdName string, total int64, _ bool) glue.Progress { - gs.progress.lock.Lock() - gs.progress.cmd = cmdName - gs.progress.total = total - atomic.StoreInt64(&gs.progress.current, 0) - gs.progress.lock.Unlock() - return gs.progress -} - -// Record implements glue.Glue -func (gs *tidbGlueSession) Record(name string, value uint64) { - switch name { - case "BackupTS": - gs.info.backupTS = value - case "RestoreTS": - gs.info.restoreTS = value - case "Size": - gs.info.archiveSize = value - } -} - -func (*tidbGlueSession) GetVersion() string { - return "TiDB\n" + printer.GetTiDBInfo() -} - -// UseOneShotSession implements glue.Glue -func (gs *tidbGlueSession) UseOneShotSession(_ kv.Storage, _ bool, fn func(se glue.Session) error) error { - // in SQL backup. we don't need to close domain. - return fn(gs) +// GetSessionCtx implements glue.Glue +func (gs *tidbGlueSession) GetSessionCtx() sessionctx.Context { + return gs.se } func restoreQuery(stmt *ast.BRIEStmt) string { diff --git a/pkg/executor/brie_test.go b/pkg/executor/brie_test.go index 0fbe566d02422..220c4f46deafc 100644 --- a/pkg/executor/brie_test.go +++ b/pkg/executor/brie_test.go @@ -36,7 +36,7 @@ import ( ) func TestGlueGetVersion(t *testing.T) { - g := tidbGlueSession{} + g := tidbGlue{} version := g.GetVersion() require.Contains(t, version, `Release Version`) require.Contains(t, version, `Git Commit Hash`) diff --git a/pkg/executor/brie_utils.go b/pkg/executor/brie_utils.go new file mode 100644 index 0000000000000..81ee8251add6b --- /dev/null +++ b/pkg/executor/brie_utils.go @@ -0,0 +1,183 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "bytes" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta/autoid" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx" + "go.uber.org/zap" +) + +const ( + defaultCapOfCreateTable = 512 + defaultCapOfCreateDatabase = 64 +) + +// SplitBatchCreateTableForTest is only used for test. +var SplitBatchCreateTableForTest = splitBatchCreateTable + +// showRestoredCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo. +func showRestoredCreateDatabase(sctx sessionctx.Context, db *model.DBInfo, brComment string) (string, error) { + result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase)) + if len(brComment) > 0 { + // this can never fail. + _, _ = result.WriteString(brComment) + } + if err := ConstructResultOfShowCreateDatabase(sctx, db, true, result); err != nil { + return "", errors.Trace(err) + } + return result.String(), nil +} + +// BRIECreateDatabase creates the database with OnExistIgnore option +func BRIECreateDatabase(sctx sessionctx.Context, schema *model.DBInfo, brComment string) error { + d := domain.GetDomain(sctx).DDL() + query, err := showRestoredCreateDatabase(sctx, schema, brComment) + if err != nil { + return errors.Trace(err) + } + originQuery := sctx.Value(sessionctx.QueryString) + sctx.SetValue(sessionctx.QueryString, query) + defer func() { + sctx.SetValue(sessionctx.QueryString, originQuery) + }() + + schema = schema.Clone() + if len(schema.Charset) == 0 { + schema.Charset = mysql.DefaultCharset + } + return d.CreateSchemaWithInfo(sctx, schema, ddl.OnExistIgnore) +} + +// showRestoredCreateTable shows the result of SHOW CREATE TABLE from a tableInfo. +func showRestoredCreateTable(sctx sessionctx.Context, tbl *model.TableInfo, brComment string) (string, error) { + result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable)) + if len(brComment) > 0 { + // this can never fail. + _, _ = result.WriteString(brComment) + } + if err := ConstructResultOfShowCreateTable(sctx, tbl, autoid.Allocators{}, result); err != nil { + return "", err + } + return result.String(), nil +} + +// BRIECreateTable creates the table with OnExistIgnore option +func BRIECreateTable( + sctx sessionctx.Context, + dbName model.CIStr, + table *model.TableInfo, + brComment string, + cs ...ddl.CreateTableWithInfoConfigurier, +) error { + d := domain.GetDomain(sctx).DDL() + query, err := showRestoredCreateTable(sctx, table, brComment) + if err != nil { + return err + } + originQuery := sctx.Value(sessionctx.QueryString) + sctx.SetValue(sessionctx.QueryString, query) + // Disable foreign key check when batch create tables. + originForeignKeyChecks := sctx.GetSessionVars().ForeignKeyChecks + sctx.GetSessionVars().ForeignKeyChecks = false + defer func() { + sctx.SetValue(sessionctx.QueryString, originQuery) + sctx.GetSessionVars().ForeignKeyChecks = originForeignKeyChecks + }() + + table = table.Clone() + + return d.CreateTableWithInfo(sctx, dbName, table, append(cs, ddl.OnExistIgnore)...) +} + +// BRIECreateTables creates the tables with OnExistIgnore option in batch +func BRIECreateTables( + sctx sessionctx.Context, + tables map[string][]*model.TableInfo, + brComment string, + cs ...ddl.CreateTableWithInfoConfigurier, +) error { + // Disable foreign key check when batch create tables. + originForeignKeyChecks := sctx.GetSessionVars().ForeignKeyChecks + sctx.GetSessionVars().ForeignKeyChecks = false + originQuery := sctx.Value(sessionctx.QueryString) + defer func() { + sctx.SetValue(sessionctx.QueryString, originQuery) + sctx.GetSessionVars().ForeignKeyChecks = originForeignKeyChecks + }() + for db, tablesInDB := range tables { + dbName := model.NewCIStr(db) + queryBuilder := strings.Builder{} + cloneTables := make([]*model.TableInfo, 0, len(tablesInDB)) + for _, table := range tablesInDB { + query, err := showRestoredCreateTable(sctx, table, brComment) + if err != nil { + return errors.Trace(err) + } + + queryBuilder.WriteString(query) + queryBuilder.WriteString(";") + + cloneTables = append(cloneTables, table.Clone()) + } + sctx.SetValue(sessionctx.QueryString, queryBuilder.String()) + if err := splitBatchCreateTable(sctx, dbName, cloneTables, cs...); err != nil { + //It is possible to failure when TiDB does not support model.ActionCreateTables. + //In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob, + //we fall back to old way that creating table one by one + log.Warn("batch create table from tidb failure", zap.Error(err)) + return err + } + } + + return nil +} + +// splitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB. +// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation +// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all. +func splitBatchCreateTable(sctx sessionctx.Context, schema model.CIStr, + infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + var err error + d := domain.GetDomain(sctx).DDL() + err = d.BatchCreateTableWithInfo(sctx, schema, infos, append(cs, ddl.OnExistIgnore)...) + if kv.ErrEntryTooLarge.Equal(err) { + log.Info("entry too large, split batch create table", zap.Int("num table", len(infos))) + if len(infos) == 1 { + return err + } + mid := len(infos) / 2 + err = splitBatchCreateTable(sctx, schema, infos[:mid], cs...) + if err != nil { + return err + } + err = splitBatchCreateTable(sctx, schema, infos[mid:], cs...) + if err != nil { + return err + } + return nil + } + return err +} diff --git a/pkg/executor/brie_utils_test.go b/pkg/executor/brie_utils_test.go new file mode 100644 index 0000000000000..4489c1d280158 --- /dev/null +++ b/pkg/executor/brie_utils_test.go @@ -0,0 +1,322 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/executor" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/stretchr/testify/require" +) + +// batch create table with table id reused +func TestSplitBatchCreateTableWithTableId(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_id_resued1") + tk.MustExec("drop table if exists table_id_resued2") + tk.MustExec("drop table if exists table_id_new") + + d := dom.DDL() + require.NotNil(t, d) + + infos1 := []*model.TableInfo{} + infos1 = append(infos1, &model.TableInfo{ + ID: 124, + Name: model.NewCIStr("table_id_resued1"), + }) + infos1 = append(infos1, &model.TableInfo{ + ID: 125, + Name: model.NewCIStr("table_id_resued2"), + }) + + sctx := tk.Session() + + // keep/reused table id verification + sctx.SetValue(sessionctx.QueryString, "skip") + err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos1, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return false + })) + require.NoError(t, err) + require.Equal(t, "skip", sctx.Value(sessionctx.QueryString)) + + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued1'"). + Check(testkit.Rows("124")) + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued2'"). + Check(testkit.Rows("125")) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + + // allocate new table id verification + // query the global id + var id int64 + err = kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error { + m := meta.NewMeta(txn) + var err error + id, err = m.GenGlobalID() + return err + }) + + require.NoError(t, err) + + infos2 := []*model.TableInfo{} + infos2 = append(infos2, &model.TableInfo{ + ID: 124, + Name: model.NewCIStr("table_id_new"), + }) + + tk.Session().SetValue(sessionctx.QueryString, "skip") + err = executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos2, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return true + })) + require.NoError(t, err) + require.Equal(t, "skip", sctx.Value(sessionctx.QueryString)) + + idGen, ok := tk.MustQuery( + "select tidb_table_id from information_schema.tables where table_name = 'table_id_new'"). + Rows()[0][0].(string) + require.True(t, ok) + idGenNum, err := strconv.ParseInt(idGen, 10, 64) + require.NoError(t, err) + require.Greater(t, idGenNum, id) + + // a empty table info with len(info3) = 0 + infos3 := []*model.TableInfo{} + + originQueryString := sctx.Value(sessionctx.QueryString) + err = executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos3, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return false + })) + require.NoError(t, err) + require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString)) +} + +// batch create table with table id reused +func TestSplitBatchCreateTable(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_1") + tk.MustExec("drop table if exists table_2") + tk.MustExec("drop table if exists table_3") + + d := dom.DDL() + require.NotNil(t, d) + + infos := []*model.TableInfo{} + infos = append(infos, &model.TableInfo{ + ID: 1234, + Name: model.NewCIStr("tables_1"), + }) + infos = append(infos, &model.TableInfo{ + ID: 1235, + Name: model.NewCIStr("tables_2"), + }) + infos = append(infos, &model.TableInfo{ + ID: 1236, + Name: model.NewCIStr("tables_3"), + }) + + sctx := tk.Session() + + // keep/reused table id verification + tk.Session().SetValue(sessionctx.QueryString, "skip") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(1)")) + err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return false + })) + require.NoError(t, err) + require.Equal(t, "skip", sctx.Value(sessionctx.QueryString)) + + tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3")) + jobs := tk.MustQuery("admin show ddl jobs").Rows() + require.Greater(t, len(jobs), 3) + // check table_1 + job1 := jobs[0] + require.Equal(t, "test", job1[1]) + require.Equal(t, "tables_3", job1[2]) + require.Equal(t, "create tables", job1[3]) + require.Equal(t, "public", job1[4]) + + // check table_2 + job2 := jobs[1] + require.Equal(t, "test", job2[1]) + require.Equal(t, "tables_2", job2[2]) + require.Equal(t, "create tables", job2[3]) + require.Equal(t, "public", job2[4]) + + // check table_3 + job3 := jobs[2] + require.Equal(t, "test", job3[1]) + require.Equal(t, "tables_1", job3[2]) + require.Equal(t, "create tables", job3[3]) + require.Equal(t, "public", job3[4]) + + // check reused table id + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'"). + Check(testkit.Rows("1234")) + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'"). + Check(testkit.Rows("1235")) + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'"). + Check(testkit.Rows("1236")) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge")) +} + +// batch create table with table id reused +func TestSplitBatchCreateTableFailWithEntryTooLarge(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_1") + tk.MustExec("drop table if exists table_2") + tk.MustExec("drop table if exists table_3") + + d := dom.DDL() + require.NotNil(t, d) + + infos := []*model.TableInfo{} + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_1"), + }) + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_2"), + }) + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_3"), + }) + + sctx := tk.Session() + + tk.Session().SetValue(sessionctx.QueryString, "skip") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(0)")) + err := executor.SplitBatchCreateTableForTest(sctx, model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return true + })) + require.Equal(t, "skip", sctx.Value(sessionctx.QueryString)) + require.True(t, kv.ErrEntryTooLarge.Equal(err)) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge")) +} + +func TestBRIECreateDatabase(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists db_1") + tk.MustExec("drop database if exists db_1") + + d := dom.DDL() + require.NotNil(t, d) + + sctx := tk.Session() + originQueryString := sctx.Value(sessionctx.QueryString) + schema1 := &model.DBInfo{ + ID: 1230, + Name: model.NewCIStr("db_1"), + Charset: "utf8mb4", + Collate: "utf8mb4_bin", + State: model.StatePublic, + } + err := executor.BRIECreateDatabase(sctx, schema1, "/* from test */") + require.NoError(t, err) + + schema2 := &model.DBInfo{ + ID: 1240, + Name: model.NewCIStr("db_2"), + Charset: "utf8mb4", + Collate: "utf8mb4_bin", + State: model.StatePublic, + } + err = executor.BRIECreateDatabase(sctx, schema2, "") + require.NoError(t, err) + require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString)) + tk.MustExec("use db_1") + tk.MustExec("use db_2") +} + +func mockTableInfo(t *testing.T, sctx sessionctx.Context, createSQL string) *model.TableInfo { + node, err := parser.New().ParseOneStmt(createSQL, "", "") + require.NoError(t, err) + info, err := ddl.MockTableInfo(sctx, node.(*ast.CreateTableStmt), 1) + require.NoError(t, err) + info.State = model.StatePublic + return info +} + +func TestBRIECreateTable(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_1") + tk.MustExec("drop table if exists table_2") + + d := dom.DDL() + require.NotNil(t, d) + + sctx := tk.Session() + originQueryString := sctx.Value(sessionctx.QueryString) + dbName := model.NewCIStr("test") + tableInfo := mockTableInfo(t, sctx, "create table test.table_1 (a int primary key, b json, c varchar(20))") + tableInfo.ID = 1230 + err := executor.BRIECreateTable(sctx, dbName, tableInfo, "/* from test */") + require.NoError(t, err) + + tableInfo.ID = 1240 + tableInfo.Name = model.NewCIStr("table_2") + err = executor.BRIECreateTable(sctx, dbName, tableInfo, "") + require.NoError(t, err) + require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString)) + tk.MustExec("desc table_1") + tk.MustExec("desc table_2") +} + +func TestBRIECreateTables(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tableInfos := make([]*model.TableInfo, 100) + for i := range tableInfos { + tk.MustExec(fmt.Sprintf("drop table if exists table_%d", i)) + } + + d := dom.DDL() + require.NotNil(t, d) + + sctx := tk.Session() + originQueryString := sctx.Value(sessionctx.QueryString) + for i := range tableInfos { + tableInfos[i] = mockTableInfo(t, sctx, fmt.Sprintf("create table test.table_%d (a int primary key, b json, c varchar(20))", i)) + tableInfos[i].ID = 1230 + int64(i) + } + err := executor.BRIECreateTables(sctx, map[string][]*model.TableInfo{"test": tableInfos}, "/* from test */") + require.NoError(t, err) + + require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString)) + for i := range tableInfos { + tk.MustExec(fmt.Sprintf("desc table_%d", i)) + } +} diff --git a/tests/realtikvtest/brietest/backup_restore_test.go b/tests/realtikvtest/brietest/backup_restore_test.go index 370a49e4b6ea2..7b74b04f740be 100644 --- a/tests/realtikvtest/brietest/backup_restore_test.go +++ b/tests/realtikvtest/brietest/backup_restore_test.go @@ -15,8 +15,10 @@ package brietest import ( + "fmt" "os" "path" + "strings" "testing" "github.com/pingcap/tidb/pkg/config" @@ -70,3 +72,46 @@ func TestBackupAndRestore(t *testing.T) { tk.MustQuery("select count(*) from t1").Check(testkit.Rows("3")) tk.MustExec("drop database br") } + +func TestRestoreMultiTables(t *testing.T) { + tk := initTestKit(t) + tk.MustExec("create database if not exists br") + tk.MustExec("use br") + + tablesNameSet := make(map[string]struct{}) + tableNum := 1000 + for i := 0; i < tableNum; i += 1 { + tk.MustExec(fmt.Sprintf("create table table_%d (a int primary key, b json, c varchar(20))", i)) + tk.MustExec(fmt.Sprintf("insert into table_%d values (1, '{\"a\": 1, \"b\": 2}', '123')", i)) + tk.MustQuery(fmt.Sprintf("select count(*) from table_%d", i)).Check(testkit.Rows("1")) + tablesNameSet[fmt.Sprintf("table_%d", i)] = struct{}{} + } + + tmpDir := path.Join(os.TempDir(), "bk1") + require.NoError(t, os.RemoveAll(tmpDir)) + // backup database to tmp dir + tk.MustQuery("backup database br to 'local://" + tmpDir + "'") + + // remove database for recovery + tk.MustExec("drop database br") + + // restore database with backup data + tk.MustQuery("restore database * from 'local://" + tmpDir + "'") + tk.MustExec("use br") + ddlCreateTablesRows := tk.MustQuery("admin show ddl jobs where JOB_TYPE = 'create tables'").Rows() + cnt := 0 + for _, row := range ddlCreateTablesRows { + tables := row[2].(string) + require.NotEqual(t, "", tables) + for _, table := range strings.Split(tables, ",") { + _, ok := tablesNameSet[table] + require.True(t, ok) + cnt += 1 + } + } + require.Equal(t, tableNum, cnt) + for i := 0; i < tableNum; i += 1 { + tk.MustQuery(fmt.Sprintf("select count(*) from table_%d", i)).Check(testkit.Rows("1")) + } + tk.MustExec("drop database br") +}