From f27716750aeed7547c7382c78238c3620c2a3b22 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 23 Mar 2023 18:20:42 +0800 Subject: [PATCH] ddl: fix corrupted table meta after rollback a reorganize partition job (#42479) (#42498) close pingcap/tidb#42448 --- ddl/partition.go | 4 +- ddl/partition_test.go | 103 ++++++++++++++++++++++++++++++++++++++++++ ddl/rollingback.go | 17 +++++++ 3 files changed, 123 insertions(+), 1 deletion(-) diff --git a/ddl/partition.go b/ddl/partition.go index 955757808f779..a0d7fad1b4d60 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1742,7 +1742,9 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( return ver, errors.Trace(err) } if job.Type == model.ActionAddTablePartition || job.Type == model.ActionReorganizePartition { - // It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo. + // It is rollback from reorganize partition, just remove DroppingDefinitions from tableInfo + tblInfo.Partition.DroppingDefinitions = nil + // It is rollback from adding table partition, just remove addingDefinitions from tableInfo. physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo) err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles) if err != nil { diff --git a/ddl/partition_test.go b/ddl/partition_test.go index 1a25545d86bed..dd4024770cf2d 100644 --- a/ddl/partition_test.go +++ b/ddl/partition_test.go @@ -16,8 +16,10 @@ package ddl_test import ( "testing" + "time" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/internal/callback" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -139,3 +141,104 @@ func testTruncatePartition(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbIn checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) return job } + +func TestReorganizePartitionRollback(t *testing.T) { + // See issue: https://github.com/pingcap/tidb/issues/42448 + store, do := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("CREATE TABLE `t1` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `k` int(11) NOT NULL DEFAULT '0',\n" + + " `c` char(120) NOT NULL DEFAULT '',\n" + + " `pad` char(60) NOT NULL DEFAULT '',\n" + + " PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `k_1` (`k`)\n" + + " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + " PARTITION BY RANGE (`id`)\n" + + " (PARTITION `p0` VALUES LESS THAN (2000000),\n" + + " PARTITION `p1` VALUES LESS THAN (4000000),\n" + + " PARTITION `p2` VALUES LESS THAN (6000000),\n" + + " PARTITION `p3` VALUES LESS THAN (8000000),\n" + + " PARTITION `p4` VALUES LESS THAN (10000000),\n" + + " PARTITION `p5` VALUES LESS THAN (MAXVALUE))") + tk.MustExec("insert into t1(k, c, pad) values (1, 'a', 'beijing'), (2, 'b', 'chengdu')") + + wait := make(chan struct{}) + defer close(wait) + ddlDone := make(chan error) + defer close(ddlDone) + hook := &callback.TestDDLCallback{Do: do} + hook.OnJobRunAfterExported = func(job *model.Job) { + if job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateWriteReorganization { + <-wait + <-wait + } + } + do.DDL().SetHook(hook) + + go func() { + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + err := tk2.ExecToErr("alter table t1 reorganize partition p0, p1, p2, p3, p4 into( partition pnew values less than (10000000))") + ddlDone <- err + }() + + jobID := "" + + // wait DDL job reaches hook and then cancel + select { + case wait <- struct{}{}: + rows := tk.MustQuery("admin show ddl jobs where JOB_TYPE='alter table reorganize partition'").Rows() + require.Equal(t, 1, len(rows)) + jobID = rows[0][0].(string) + tk.MustExec("admin cancel ddl jobs " + jobID) + case <-time.After(time.Minute): + require.FailNow(t, "timeout") + } + + // continue to run DDL + select { + case wait <- struct{}{}: + case <-time.After(time.Minute): + require.FailNow(t, "timeout") + } + + // wait ddl done + select { + case err := <-ddlDone: + require.Error(t, err) + case <-time.After(time.Minute): + require.FailNow(t, "wait ddl cancelled timeout") + } + + // check job rollback finished + rows := tk.MustQuery("admin show ddl jobs where JOB_ID=" + jobID).Rows() + require.Equal(t, 1, len(rows)) + require.Equal(t, "rollback done", rows[0][len(rows[0])-1]) + + // check table meta after rollback + tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `k` int(11) NOT NULL DEFAULT '0',\n" + + " `c` char(120) NOT NULL DEFAULT '',\n" + + " `pad` char(60) NOT NULL DEFAULT '',\n" + + " PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `k_1` (`k`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=5001\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p0` VALUES LESS THAN (2000000),\n" + + " PARTITION `p1` VALUES LESS THAN (4000000),\n" + + " PARTITION `p2` VALUES LESS THAN (6000000),\n" + + " PARTITION `p3` VALUES LESS THAN (8000000),\n" + + " PARTITION `p4` VALUES LESS THAN (10000000),\n" + + " PARTITION `p5` VALUES LESS THAN (MAXVALUE))")) + tbl, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + require.NoError(t, err) + require.NotNil(t, tbl.Meta().Partition) + require.Nil(t, tbl.Meta().Partition.AddingDefinitions) + require.Nil(t, tbl.Meta().Partition.DroppingDefinitions) + + // test then add index should success + tk.MustExec("alter table t1 add index idx_kc (k, c)") +} diff --git a/ddl/rollingback.go b/ddl/rollingback.go index c6f75442479b6..279093ed7ba20 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -355,6 +355,21 @@ func rollingbackTruncateTable(t *meta.Meta, job *model.Job) (ver int64, err erro return cancelOnlyNotHandledJob(job, model.StateNone) } +func rollingbackReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + if job.SchemaState == model.StateNone { + job.State = model.JobStateCancelled + return ver, dbterror.ErrCancelledDDLJob + } + + tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + + // addingDefinitions is also in tblInfo, here pass the tblInfo as parameter directly. + return convertAddTablePartitionJob2RollbackJob(d, t, job, dbterror.ErrCancelledDDLJob, tblInfo) +} + func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { switch job.Type { case model.ActionAddColumn: @@ -365,6 +380,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) ver, err = rollingbackAddIndex(w, d, t, job, true) case model.ActionAddTablePartition: ver, err = rollingbackAddTablePartition(d, t, job) + case model.ActionReorganizePartition: + ver, err = rollingbackReorganizePartition(d, t, job) case model.ActionDropColumn: ver, err = rollingbackDropColumn(d, t, job) case model.ActionDropIndex, model.ActionDropPrimaryKey: