Skip to content

Commit

Permalink
ddl: fix corrupted table meta after rollback a reorganize partition j…
Browse files Browse the repository at this point in the history
…ob (#42479) (#42498)

close #42448
  • Loading branch information
ti-chi-bot authored Mar 23, 2023
1 parent 56c5016 commit f277167
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 1 deletion.
4 changes: 3 additions & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1742,7 +1742,9 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
return ver, errors.Trace(err)
}
if job.Type == model.ActionAddTablePartition || job.Type == model.ActionReorganizePartition {
// It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo.
// It is rollback from reorganize partition, just remove DroppingDefinitions from tableInfo
tblInfo.Partition.DroppingDefinitions = nil
// It is rollback from adding table partition, just remove addingDefinitions from tableInfo.
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
Expand Down
103 changes: 103 additions & 0 deletions ddl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package ddl_test

import (
"testing"
"time"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/internal/callback"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -139,3 +141,104 @@ func testTruncatePartition(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbIn
checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func TestReorganizePartitionRollback(t *testing.T) {
// See issue: https://github.com/pingcap/tidb/issues/42448
store, do := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("CREATE TABLE `t1` (\n" +
" `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" +
" `k` int(11) NOT NULL DEFAULT '0',\n" +
" `c` char(120) NOT NULL DEFAULT '',\n" +
" `pad` char(60) NOT NULL DEFAULT '',\n" +
" PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `k_1` (`k`)\n" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
" PARTITION BY RANGE (`id`)\n" +
" (PARTITION `p0` VALUES LESS THAN (2000000),\n" +
" PARTITION `p1` VALUES LESS THAN (4000000),\n" +
" PARTITION `p2` VALUES LESS THAN (6000000),\n" +
" PARTITION `p3` VALUES LESS THAN (8000000),\n" +
" PARTITION `p4` VALUES LESS THAN (10000000),\n" +
" PARTITION `p5` VALUES LESS THAN (MAXVALUE))")
tk.MustExec("insert into t1(k, c, pad) values (1, 'a', 'beijing'), (2, 'b', 'chengdu')")

wait := make(chan struct{})
defer close(wait)
ddlDone := make(chan error)
defer close(ddlDone)
hook := &callback.TestDDLCallback{Do: do}
hook.OnJobRunAfterExported = func(job *model.Job) {
if job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateWriteReorganization {
<-wait
<-wait
}
}
do.DDL().SetHook(hook)

go func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
err := tk2.ExecToErr("alter table t1 reorganize partition p0, p1, p2, p3, p4 into( partition pnew values less than (10000000))")
ddlDone <- err
}()

jobID := ""

// wait DDL job reaches hook and then cancel
select {
case wait <- struct{}{}:
rows := tk.MustQuery("admin show ddl jobs where JOB_TYPE='alter table reorganize partition'").Rows()
require.Equal(t, 1, len(rows))
jobID = rows[0][0].(string)
tk.MustExec("admin cancel ddl jobs " + jobID)
case <-time.After(time.Minute):
require.FailNow(t, "timeout")
}

// continue to run DDL
select {
case wait <- struct{}{}:
case <-time.After(time.Minute):
require.FailNow(t, "timeout")
}

// wait ddl done
select {
case err := <-ddlDone:
require.Error(t, err)
case <-time.After(time.Minute):
require.FailNow(t, "wait ddl cancelled timeout")
}

// check job rollback finished
rows := tk.MustQuery("admin show ddl jobs where JOB_ID=" + jobID).Rows()
require.Equal(t, 1, len(rows))
require.Equal(t, "rollback done", rows[0][len(rows[0])-1])

// check table meta after rollback
tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" +
" `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" +
" `k` int(11) NOT NULL DEFAULT '0',\n" +
" `c` char(120) NOT NULL DEFAULT '',\n" +
" `pad` char(60) NOT NULL DEFAULT '',\n" +
" PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `k_1` (`k`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=5001\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (2000000),\n" +
" PARTITION `p1` VALUES LESS THAN (4000000),\n" +
" PARTITION `p2` VALUES LESS THAN (6000000),\n" +
" PARTITION `p3` VALUES LESS THAN (8000000),\n" +
" PARTITION `p4` VALUES LESS THAN (10000000),\n" +
" PARTITION `p5` VALUES LESS THAN (MAXVALUE))"))
tbl, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
require.NotNil(t, tbl.Meta().Partition)
require.Nil(t, tbl.Meta().Partition.AddingDefinitions)
require.Nil(t, tbl.Meta().Partition.DroppingDefinitions)

// test then add index should success
tk.MustExec("alter table t1 add index idx_kc (k, c)")
}
17 changes: 17 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,21 @@ func rollingbackTruncateTable(t *meta.Meta, job *model.Job) (ver int64, err erro
return cancelOnlyNotHandledJob(job, model.StateNone)
}

func rollingbackReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
if job.SchemaState == model.StateNone {
job.State = model.JobStateCancelled
return ver, dbterror.ErrCancelledDDLJob
}

tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

// addingDefinitions is also in tblInfo, here pass the tblInfo as parameter directly.
return convertAddTablePartitionJob2RollbackJob(d, t, job, dbterror.ErrCancelledDDLJob, tblInfo)
}

func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
Expand All @@ -365,6 +380,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
ver, err = rollingbackAddIndex(w, d, t, job, true)
case model.ActionAddTablePartition:
ver, err = rollingbackAddTablePartition(d, t, job)
case model.ActionReorganizePartition:
ver, err = rollingbackReorganizePartition(d, t, job)
case model.ActionDropColumn:
ver, err = rollingbackDropColumn(d, t, job)
case model.ActionDropIndex, model.ActionDropPrimaryKey:
Expand Down

0 comments on commit f277167

Please sign in to comment.