Skip to content

Commit

Permalink
ddl: Support multiple table rename (pingcap#19962)
Browse files Browse the repository at this point in the history
  • Loading branch information
TszKitLo40 authored Oct 28, 2020
1 parent 2c8f2b7 commit 91db54e
Show file tree
Hide file tree
Showing 14 changed files with 391 additions and 80 deletions.
104 changes: 99 additions & 5 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3299,14 +3299,108 @@ func (s *testDBSuite1) TestRenameMultiTables(c *C) {
tk.MustExec("use test")
tk.MustExec("create table t1(id int)")
tk.MustExec("create table t2(id int)")
// Currently it will fail only.
sql := fmt.Sprintf("rename table t1 to t3, t2 to t4")
_, err := tk.Exec(sql)
c.Assert(err, NotNil)
originErr := errors.Cause(err)
c.Assert(originErr.Error(), Equals, "can't run multi schema change")
c.Assert(err, IsNil)

tk.MustExec("drop table t3, t4")

tk.MustExec("create table t1 (c1 int, c2 int)")
tk.MustExec("create table t2 (c1 int, c2 int)")
tk.MustExec("insert t1 values (1, 1), (2, 2)")
tk.MustExec("insert t2 values (1, 1), (2, 2)")
ctx := tk.Se.(sessionctx.Context)
is := domain.GetDomain(ctx).InfoSchema()
oldTblInfo1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
oldTblID1 := oldTblInfo1.Meta().ID
oldTblInfo2, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
c.Assert(err, IsNil)
oldTblID2 := oldTblInfo2.Meta().ID
tk.MustExec("create database test1")
tk.MustExec("use test1")
tk.MustExec("rename table test.t1 to test1.t1, test.t2 to test1.t2")
is = domain.GetDomain(ctx).InfoSchema()
newTblInfo1, err := is.TableByName(model.NewCIStr("test1"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
c.Assert(newTblInfo1.Meta().ID, Equals, oldTblID1)
newTblInfo2, err := is.TableByName(model.NewCIStr("test1"), model.NewCIStr("t2"))
c.Assert(err, IsNil)
c.Assert(newTblInfo2.Meta().ID, Equals, oldTblID2)
tk.MustQuery("select * from t1").Check(testkit.Rows("1 1", "2 2"))
tk.MustQuery("select * from t2").Check(testkit.Rows("1 1", "2 2"))

// Make sure t1,t2 doesn't exist.
isExist := is.TableExists(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(isExist, IsFalse)
isExist = is.TableExists(model.NewCIStr("test"), model.NewCIStr("t2"))
c.Assert(isExist, IsFalse)

// for the same database
tk.MustExec("use test1")
tk.MustExec("rename table test1.t1 to test1.t3, test1.t2 to test1.t4")
is = domain.GetDomain(ctx).InfoSchema()
newTblInfo1, err = is.TableByName(model.NewCIStr("test1"), model.NewCIStr("t3"))
c.Assert(err, IsNil)
c.Assert(newTblInfo1.Meta().ID, Equals, oldTblID1)
newTblInfo2, err = is.TableByName(model.NewCIStr("test1"), model.NewCIStr("t4"))
c.Assert(err, IsNil)
c.Assert(newTblInfo2.Meta().ID, Equals, oldTblID2)
tk.MustQuery("select * from t3").Check(testkit.Rows("1 1", "2 2"))
isExist = is.TableExists(model.NewCIStr("test1"), model.NewCIStr("t1"))
c.Assert(isExist, IsFalse)
tk.MustQuery("select * from t4").Check(testkit.Rows("1 1", "2 2"))
isExist = is.TableExists(model.NewCIStr("test1"), model.NewCIStr("t2"))
c.Assert(isExist, IsFalse)
tk.MustQuery("show tables").Check(testkit.Rows("t3", "t4"))

tk.MustExec("drop table t1, t2")
// for multi tables same database
tk.MustExec("create table t5 (c1 int, c2 int)")
tk.MustExec("insert t5 values (1, 1), (2, 2)")
is = domain.GetDomain(ctx).InfoSchema()
oldTblInfo3, err := is.TableByName(model.NewCIStr("test1"), model.NewCIStr("t5"))
c.Assert(err, IsNil)
oldTblID3 := oldTblInfo3.Meta().ID
tk.MustExec("rename table test1.t3 to test1.t1, test1.t4 to test1.t2, test1.t5 to test1.t3")
is = domain.GetDomain(ctx).InfoSchema()
newTblInfo1, err = is.TableByName(model.NewCIStr("test1"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
c.Assert(newTblInfo1.Meta().ID, Equals, oldTblID1)
newTblInfo2, err = is.TableByName(model.NewCIStr("test1"), model.NewCIStr("t2"))
c.Assert(err, IsNil)
c.Assert(newTblInfo2.Meta().ID, Equals, oldTblID2)
newTblInfo3, err := is.TableByName(model.NewCIStr("test1"), model.NewCIStr("t3"))
c.Assert(err, IsNil)
c.Assert(newTblInfo3.Meta().ID, Equals, oldTblID3)
tk.MustQuery("show tables").Check(testkit.Rows("t1", "t2", "t3"))

// for multi tables different databases
tk.MustExec("use test")
tk.MustExec("rename table test1.t1 to test.t2, test1.t2 to test.t3, test1.t3 to test.t4")
is = domain.GetDomain(ctx).InfoSchema()
newTblInfo1, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
c.Assert(err, IsNil)
c.Assert(newTblInfo1.Meta().ID, Equals, oldTblID1)
newTblInfo2, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t3"))
c.Assert(err, IsNil)
c.Assert(newTblInfo2.Meta().ID, Equals, oldTblID2)
newTblInfo3, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t4"))
c.Assert(err, IsNil)
c.Assert(newTblInfo3.Meta().ID, Equals, oldTblID3)
tk.MustQuery("show tables").Check(testkit.Rows("t2", "t3", "t4"))

// for failure case
failSQL := "rename table test_not_exist.t to test_not_exist.t, test_not_exist.t to test_not_exist.t"
tk.MustGetErrCode(failSQL, errno.ErrFileNotFound)
failSQL = "rename table test.test_not_exist to test.test_not_exist, test.test_not_exist to test.test_not_exist"
tk.MustGetErrCode(failSQL, errno.ErrFileNotFound)
failSQL = "rename table test.t_not_exist to test_not_exist.t, test.t_not_exist to test_not_exist.t"
tk.MustGetErrCode(failSQL, errno.ErrFileNotFound)
failSQL = "rename table test1.t2 to test_not_exist.t, test1.t2 to test_not_exist.t"
tk.MustGetErrCode(failSQL, errno.ErrFileNotFound)

tk.MustExec("drop database test1")
tk.MustExec("drop database test")
}

func (s *testDBSuite2) TestAddNotNullColumn(c *C) {
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type DDL interface {
AlterTable(ctx sessionctx.Context, tableIdent ast.Ident, spec []*ast.AlterTableSpec) error
TruncateTable(ctx sessionctx.Context, tableIdent ast.Ident) error
RenameTable(ctx sessionctx.Context, oldTableIdent, newTableIdent ast.Ident, isAlterTable bool) error
RenameTables(ctx sessionctx.Context, oldTableIdent, newTableIdent []ast.Ident, isAlterTable bool) error
LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error
CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error
Expand Down
133 changes: 107 additions & 26 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
expressionIndexPrefix = "_V$"
changingColumnPrefix = "_Col$_"
changingIndexPrefix = "_Idx$_"
tableNotExist = -1
)

func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt) error {
Expand Down Expand Up @@ -4480,57 +4481,137 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {

func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, isAlterTable bool) error {
is := d.GetInfoSchemaWithInterceptor(ctx)
tables := make(map[string]int64)
schemas, tableID, err := extractTblInfos(is, oldIdent, newIdent, isAlterTable, tables)
if err != nil {
return err
}

if schemas == nil {
return nil
}

job := &model.Job{
SchemaID: schemas[1].ID,
TableID: tableID,
SchemaName: schemas[1].Name.L,
Type: model.ActionRenameTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{schemas[0].ID, newIdent.Name},
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) RenameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Ident, isAlterTable bool) error {
is := d.GetInfoSchemaWithInterceptor(ctx)
tableNames := make([]*model.CIStr, 0, len(oldIdents))
oldSchemaIDs := make([]int64, 0, len(oldIdents))
newSchemaIDs := make([]int64, 0, len(oldIdents))
tableIDs := make([]int64, 0, len(oldIdents))

var schemas []*model.DBInfo
var tableID int64
var err error

tables := make(map[string]int64)
for i := 0; i < len(oldIdents); i++ {
schemas, tableID, err = extractTblInfos(is, oldIdents[i], newIdents[i], isAlterTable, tables)
if err != nil {
return err
}
tableIDs = append(tableIDs, tableID)
tableNames = append(tableNames, &newIdents[i].Name)
oldSchemaIDs = append(oldSchemaIDs, schemas[0].ID)
newSchemaIDs = append(newSchemaIDs, schemas[1].ID)
}

job := &model.Job{
SchemaID: schemas[1].ID,
TableID: tableIDs[0],
SchemaName: schemas[1].Name.L,
Type: model.ActionRenameTables,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs},
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func extractTblInfos(is infoschema.InfoSchema, oldIdent, newIdent ast.Ident, isAlterTable bool, tables map[string]int64) ([]*model.DBInfo, int64, error) {
oldSchema, ok := is.SchemaByName(oldIdent.Schema)
if !ok {
if isAlterTable {
return infoschema.ErrTableNotExists.GenWithStackByArgs(oldIdent.Schema, oldIdent.Name)
return nil, 0, infoschema.ErrTableNotExists.GenWithStackByArgs(oldIdent.Schema, oldIdent.Name)
}
if is.TableExists(newIdent.Schema, newIdent.Name) {
return infoschema.ErrTableExists.GenWithStackByArgs(newIdent)
if tableExists(is, newIdent, tables) {
return nil, 0, infoschema.ErrTableExists.GenWithStackByArgs(newIdent)
}
return errFileNotFound.GenWithStackByArgs(oldIdent.Schema, oldIdent.Name)
return nil, 0, errFileNotFound.GenWithStackByArgs(oldIdent.Schema, oldIdent.Name)
}
oldTbl, err := is.TableByName(oldIdent.Schema, oldIdent.Name)
if err != nil {
if !tableExists(is, oldIdent, tables) {
if isAlterTable {
return infoschema.ErrTableNotExists.GenWithStackByArgs(oldIdent.Schema, oldIdent.Name)
return nil, 0, infoschema.ErrTableNotExists.GenWithStackByArgs(oldIdent.Schema, oldIdent.Name)
}
if is.TableExists(newIdent.Schema, newIdent.Name) {
return infoschema.ErrTableExists.GenWithStackByArgs(newIdent)
if tableExists(is, newIdent, tables) {
return nil, 0, infoschema.ErrTableExists.GenWithStackByArgs(newIdent)
}
return errFileNotFound.GenWithStackByArgs(oldIdent.Schema, oldIdent.Name)
return nil, 0, errFileNotFound.GenWithStackByArgs(oldIdent.Schema, oldIdent.Name)
}
if isAlterTable && newIdent.Schema.L == oldIdent.Schema.L && newIdent.Name.L == oldIdent.Name.L {
// oldIdent is equal to newIdent, do nothing
return nil
//oldIdent is equal to newIdent, do nothing
return nil, 0, nil
}
newSchema, ok := is.SchemaByName(newIdent.Schema)
if !ok {
return ErrErrorOnRename.GenWithStackByArgs(
return nil, 0, ErrErrorOnRename.GenWithStackByArgs(
fmt.Sprintf("%s.%s", oldIdent.Schema, oldIdent.Name),
fmt.Sprintf("%s.%s", newIdent.Schema, newIdent.Name),
168,
fmt.Sprintf("Database `%s` doesn't exist", newIdent.Schema))
}
if is.TableExists(newIdent.Schema, newIdent.Name) {
return infoschema.ErrTableExists.GenWithStackByArgs(newIdent)
if tableExists(is, newIdent, tables) {
return nil, 0, infoschema.ErrTableExists.GenWithStackByArgs(newIdent)
}
if err := checkTooLongTable(newIdent.Name); err != nil {
return errors.Trace(err)
return nil, 0, errors.Trace(err)
}
oldTableID := getTableID(is, oldIdent, tables)
oldIdentKey := getIdentKey(oldIdent)
tables[oldIdentKey] = tableNotExist
newIdentKey := getIdentKey(newIdent)
tables[newIdentKey] = oldTableID
return []*model.DBInfo{oldSchema, newSchema}, oldTableID, nil
}

job := &model.Job{
SchemaID: newSchema.ID,
TableID: oldTbl.Meta().ID,
SchemaName: newSchema.Name.L,
Type: model.ActionRenameTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{oldSchema.ID, newIdent.Name},
func tableExists(is infoschema.InfoSchema, ident ast.Ident, tables map[string]int64) bool {
identKey := getIdentKey(ident)
tableID, ok := tables[identKey]
if (ok && tableID != tableNotExist) || (!ok && is.TableExists(ident.Schema, ident.Name)) {
return true
}
return false
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
func getTableID(is infoschema.InfoSchema, ident ast.Ident, tables map[string]int64) int64 {
identKey := getIdentKey(ident)
tableID, ok := tables[identKey]
if !ok {
oldTbl, err := is.TableByName(ident.Schema, ident.Name)
if err != nil {
return tableNotExist
}
tableID = oldTbl.Meta().ID
}
return tableID
}

func getIdentKey(ident ast.Ident) string {
return fmt.Sprintf("%s.%s", ident.Schema.L, ident.Name.L)
}

func getAnonymousIndex(t table.Table, colName model.CIStr) model.CIStr {
Expand Down
24 changes: 24 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onAlterTablePartition(t, job)
case model.ActionAlterSequence:
ver, err = onAlterSequence(t, job)
case model.ActionRenameTables:
ver, err = onRenameTables(d, t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down Expand Up @@ -851,6 +853,28 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) {
return 0, errors.Trace(err)
}
diff.TableID = job.TableID
case model.ActionRenameTables:
oldSchemaIDs := []int64{}
newSchemaIDs := []int64{}
tableNames := []*model.CIStr{}
tableIDs := []int64{}
err = job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &tableNames, &tableIDs)
if err != nil {
return 0, errors.Trace(err)
}
affects := make([]*model.AffectedOption, len(newSchemaIDs))
for i, newSchemaID := range newSchemaIDs {
affects[i] = &model.AffectedOption{
SchemaID: newSchemaID,
TableID: tableIDs[i],
OldTableID: tableIDs[i],
OldSchemaID: oldSchemaIDs[i],
}
}
diff.TableID = tableIDs[0]
diff.SchemaID = newSchemaIDs[0]
diff.OldSchemaID = oldSchemaIDs[0]
diff.AffectedOpts = affects
case model.ActionExchangeTablePartition:
var (
ptSchemaID int64
Expand Down
11 changes: 10 additions & 1 deletion ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,23 @@ func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) {
// Test schema is correct.
tk.MustExec("select * from t")
// test for renaming table
c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/renameTableErr", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/renameTableErr", `return("ty")`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/renameTableErr"), IsNil)
}()
tk.MustExec("create table tx(a int)")
tk.MustExec("insert into tx values(1)")
_, err = tk.Exec("rename table tx to ty")
c.Assert(err, NotNil)
tk.MustExec("create table ty(a int)")
tk.MustExec("insert into ty values(2)")
_, err = tk.Exec("rename table ty to tz, tx to ty")
c.Assert(err, NotNil)
_, err = tk.Exec("select * from tz")
c.Assert(err, NotNil)
_, err = tk.Exec("rename table tx to ty, ty to tz")
c.Assert(err, NotNil)
tk.MustQuery("select * from ty").Check(testkit.Rows("2"))
// Make sure that the table's data has not been deleted.
tk.MustQuery("select * from tx").Check(testkit.Rows("1"))
// Execute ddl statement reload schema.
Expand Down
2 changes: 1 addition & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
case model.ActionModifyColumn:
ver, err = rollingbackModifyColumn(t, job)
case model.ActionRebaseAutoID, model.ActionShardRowID, model.ActionAddForeignKey,
model.ActionDropForeignKey, model.ActionRenameTable,
model.ActionDropForeignKey, model.ActionRenameTable, model.ActionRenameTables,
model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition,
model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable,
model.ActionModifyTableAutoIdCache, model.ActionAlterIndexVisibility,
Expand Down
Loading

0 comments on commit 91db54e

Please sign in to comment.