diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 44fabc0780458..e6f200fba716b 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -1961,18 +1961,20 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( job.State = model.JobStateCancelled return ver, err } - if partInfo.DDLType != model.PartitionTypeNone { + // ALTER TABLE ... PARTITION BY + if partInfo.Type != model.PartitionTypeNone { // Also remove anything with the new table id - physicalTableIDs = append(physicalTableIDs, tblInfo.Partition.NewTableID) + physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID) // Reset if it was normal table before - if tblInfo.Partition.Type == model.PartitionTypeNone { + if tblInfo.Partition.Type == model.PartitionTypeNone || + tblInfo.Partition.DDLType == model.PartitionTypeNone { tblInfo.Partition = nil } else { - tblInfo.Partition.NewTableID = 0 - tblInfo.Partition.DDLExpr = "" - tblInfo.Partition.DDLColumns = nil - tblInfo.Partition.DDLType = model.PartitionTypeNone + tblInfo.Partition.ClearReorgIntermediateInfo() } + } else { + // REMOVE PARTITIONING + tblInfo.Partition.ClearReorgIntermediateInfo() } ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true) @@ -3017,10 +3019,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) tblInfo.Partition = nil } else { // ALTER TABLE ... PARTITION BY - tblInfo.Partition.DDLType = model.PartitionTypeNone - tblInfo.Partition.DDLExpr = "" - tblInfo.Partition.DDLColumns = nil - tblInfo.Partition.NewTableID = 0 + tblInfo.Partition.ClearReorgIntermediateInfo() } err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs) if err != nil { diff --git a/pkg/ddl/rollingback.go b/pkg/ddl/rollingback.go index 44ee0f2d0b452..bcfbb7ffb413a 100644 --- a/pkg/ddl/rollingback.go +++ b/pkg/ddl/rollingback.go @@ -339,7 +339,19 @@ func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model for _, pd := range addingDefinitions { partNames = append(partNames, pd.Name.L) } - job.Args = []interface{}{partNames} + if job.Type == model.ActionReorganizePartition || + job.Type == model.ActionAlterTablePartitioning || + job.Type == model.ActionRemovePartitioning { + partInfo := &model.PartitionInfo{} + var pNames []string + err = job.DecodeArgs(&pNames, &partInfo) + if err != nil { + return ver, err + } + job.Args = []any{partNames, partInfo} + } else { + job.Args = []any{partNames} + } ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) diff --git a/pkg/parser/model/BUILD.bazel b/pkg/parser/model/BUILD.bazel index e1c9fab9fea24..7078d14a5670c 100644 --- a/pkg/parser/model/BUILD.bazel +++ b/pkg/parser/model/BUILD.bazel @@ -30,7 +30,7 @@ go_test( ], embed = [":model"], flaky = True, - shard_count = 20, + shard_count = 21, deps = [ "//pkg/parser/charset", "//pkg/parser/mysql", diff --git a/pkg/parser/model/model.go b/pkg/parser/model/model.go index 61532f9df0bb1..7c8e2dd1605a0 100644 --- a/pkg/parser/model/model.go +++ b/pkg/parser/model/model.go @@ -1308,6 +1308,14 @@ func (pi *PartitionInfo) HasTruncatingPartitionID(pid int64) bool { return false } +// ClearReorgIntermediateInfo remove intermediate information used during reorganize partition. +func (pi *PartitionInfo) ClearReorgIntermediateInfo() { + pi.DDLType = PartitionTypeNone + pi.DDLExpr = "" + pi.DDLColumns = nil + pi.NewTableID = 0 +} + // PartitionState is the state of the partition. type PartitionState struct { ID int64 `json:"id"` diff --git a/pkg/parser/model/model_test.go b/pkg/parser/model/model_test.go index e5cc71034fb25..0e608539b0dcd 100644 --- a/pkg/parser/model/model_test.go +++ b/pkg/parser/model/model_test.go @@ -818,3 +818,16 @@ func TestTTLJobInterval(t *testing.T) { require.NoError(t, err) require.Equal(t, time.Hour*200, interval) } + +func TestClearReorgIntermediateInfo(t *testing.T) { + ptInfo := &PartitionInfo{} + ptInfo.DDLType = PartitionTypeHash + ptInfo.DDLExpr = "Test DDL Expr" + ptInfo.NewTableID = 1111 + + ptInfo.ClearReorgIntermediateInfo() + require.Equal(t, PartitionTypeNone, ptInfo.DDLType) + require.Equal(t, "", ptInfo.DDLExpr) + require.Equal(t, true, ptInfo.DDLColumns == nil) + require.Equal(t, int64(0), ptInfo.NewTableID) +} diff --git a/pkg/table/tables/test/partition/BUILD.bazel b/pkg/table/tables/test/partition/BUILD.bazel index 132db36c4ad49..a41306e4e8399 100644 --- a/pkg/table/tables/test/partition/BUILD.bazel +++ b/pkg/table/tables/test/partition/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "partition_test.go", ], flaky = True, - shard_count = 18, + shard_count = 19, deps = [ "//pkg/ddl", "//pkg/domain", diff --git a/pkg/table/tables/test/partition/partition_test.go b/pkg/table/tables/test/partition/partition_test.go index 5edf6ed4b77bf..c9b0bb1054ede 100644 --- a/pkg/table/tables/test/partition/partition_test.go +++ b/pkg/table/tables/test/partition/partition_test.go @@ -3057,3 +3057,84 @@ func TestPointGetKeyPartitioning(t *testing.T) { tk.MustExec(`INSERT INTO t VALUES ('Aa', 'Ab', 'Ac'), ('Ba', 'Bb', 'Bc')`) tk.MustQuery(`SELECT * FROM t WHERE b = 'Ab'`).Check(testkit.Rows("Aa Ab Ac")) } + +// Issue TiDB #51090. +func TestAlterTablePartitionRollback(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + tk3 := testkit.NewTestKit(t, store) + tk4 := testkit.NewTestKit(t, store) + tk5 := testkit.NewTestKit(t, store) + tk.MustExec(`use test;`) + tk2.MustExec(`use test;`) + tk3.MustExec(`use test;`) + tk4.MustExec(`use test;`) + tk5.MustExec(`use test;`) + tk.MustExec(`create table t(a int);`) + tk.MustExec(`insert into t values(1), (2), (3);`) + + alterChan := make(chan error) + alterPartition := func() { + err := tk4.ExecToErr(`alter table t partition by hash(a) partitions 3;`) + alterChan <- err + } + waitFor := func(s string) { + for { + select { + case alterErr := <-alterChan: + require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr) + default: + // Alter still running + } + res := tk5.MustQuery(`admin show ddl jobs where db_name = 'test' and table_name = 't' and job_type = 'alter table partition by'`).Rows() + if len(res) > 0 && res[0][4] == s { + logutil.BgLogger().Info("Got state", zap.String("State", s)) + break + } + gotime.Sleep(10 * gotime.Millisecond) + } + dom := domain.GetDomain(tk5.Session()) + // Make sure the table schema is the new schema. + require.NoError(t, dom.Reload()) + } + + testFunc := func(states []string) { + for i, s := range states { + if i%2 == 0 { + tk2.MustExec(`begin;`) + tk2.MustExec(`select 1 from t;`) + if i > 0 { + tk3.MustExec(`commit;`) + } + } else { + tk3.MustExec(`begin;`) + tk3.MustExec(`select 1 from t;`) + tk2.MustExec(`commit;`) + } + if i == 0 { + go alterPartition() + } + waitFor(s) + if i == len(states)-1 { + break + } + } + res := tk.MustQuery(`admin show ddl jobs where table_name = 't' and job_type = 'alter table partition by'`).Rows() + tk.MustExec(fmt.Sprintf("admin cancel ddl jobs %v", res[0][0])) + tk2.MustExec(`commit;`) + tk3.MustExec(`commit;`) + require.ErrorContains(t, <-alterChan, "[ddl:8214]Cancelled DDL job") + tk.MustQuery(`show create table t;`).Check(testkit.Rows( + "t CREATE TABLE `t` (\n" + + " `a` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustQuery(`select a from t order by a;`).Check(testkit.Rows("1", "2", "3")) + } + + states := []string{"delete only", "write only", "write reorganization", "delete reorganization"} + for i := range states { + testFunc(states[:i+1]) + } +}