Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix rollback reorganize partition left intermediate state #51631

Merged
merged 14 commits into from
Mar 29, 2024
21 changes: 10 additions & 11 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,18 +2003,20 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
job.State = model.JobStateCancelled
return ver, err
}
if partInfo.DDLType != model.PartitionTypeNone {
// ALTER TABLE ... PARTITION BY
if partInfo.Type != model.PartitionTypeNone {
// Also remove anything with the new table id
physicalTableIDs = append(physicalTableIDs, tblInfo.Partition.NewTableID)
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
// Reset if it was normal table before
if tblInfo.Partition.Type == model.PartitionTypeNone {
if tblInfo.Partition.Type == model.PartitionTypeNone ||
tblInfo.Partition.DDLType == model.PartitionTypeNone {
tblInfo.Partition = nil
} else {
tblInfo.Partition.NewTableID = 0
tblInfo.Partition.DDLExpr = ""
tblInfo.Partition.DDLColumns = nil
tblInfo.Partition.DDLType = model.PartitionTypeNone
tblInfo.Partition.ClearReorgIntermediateInfo()
}
} else {
// REMOVE PARTITIONING
tblInfo.Partition.ClearReorgIntermediateInfo()
}

ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
Expand Down Expand Up @@ -3081,10 +3083,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
tblInfo.Partition = nil
} else {
// ALTER TABLE ... PARTITION BY
tblInfo.Partition.DDLType = model.PartitionTypeNone
tblInfo.Partition.DDLExpr = ""
tblInfo.Partition.DDLColumns = nil
tblInfo.Partition.NewTableID = 0
tblInfo.Partition.ClearReorgIntermediateInfo()
}
err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs)
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,19 @@ func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model
for _, pd := range addingDefinitions {
partNames = append(partNames, pd.Name.L)
}
job.Args = []any{partNames}
if job.Type == model.ActionReorganizePartition ||
job.Type == model.ActionAlterTablePartitioning ||
job.Type == model.ActionRemovePartitioning {
partInfo := &model.PartitionInfo{}
var pNames []string
err = job.DecodeArgs(&pNames, &partInfo)
if err != nil {
return ver, err
}
job.Args = []any{partNames, partInfo}
} else {
job.Args = []any{partNames}
}
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/parser/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 21,
shard_count = 22,
deps = [
"//pkg/parser/charset",
"//pkg/parser/mysql",
Expand Down
8 changes: 8 additions & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,14 @@ func (pi *PartitionInfo) HasTruncatingPartitionID(pid int64) bool {
return false
}

// ClearReorgIntermediateInfo remove intermediate information used during reorganize partition.
func (pi *PartitionInfo) ClearReorgIntermediateInfo() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a simple UT to model_test.go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is added. 34b8c3a

pi.DDLType = PartitionTypeNone
pi.DDLExpr = ""
pi.DDLColumns = nil
pi.NewTableID = 0
}

// PartitionState is the state of the partition.
type PartitionState struct {
ID int64 `json:"id"`
Expand Down
13 changes: 13 additions & 0 deletions pkg/parser/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,3 +818,16 @@ func TestTTLJobInterval(t *testing.T) {
require.NoError(t, err)
require.Equal(t, time.Hour*200, interval)
}

func TestClearReorgIntermediateInfo(t *testing.T) {
ptInfo := &PartitionInfo{}
ptInfo.DDLType = PartitionTypeHash
ptInfo.DDLExpr = "Test DDL Expr"
ptInfo.NewTableID = 1111

ptInfo.ClearReorgIntermediateInfo()
require.Equal(t, PartitionTypeNone, ptInfo.DDLType)
require.Equal(t, "", ptInfo.DDLExpr)
require.Equal(t, true, ptInfo.DDLColumns == nil)
require.Equal(t, int64(0), ptInfo.NewTableID)
}
2 changes: 1 addition & 1 deletion pkg/table/tables/test/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"partition_test.go",
],
flaky = True,
shard_count = 21,
shard_count = 22,
deps = [
"//pkg/ddl",
"//pkg/domain",
Expand Down
81 changes: 81 additions & 0 deletions pkg/table/tables/test/partition/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3184,3 +3184,84 @@ func TestPartitionCoverage(t *testing.T) {
tk.MustExec(`delete from t19141 partition (p0) where c_int in (2,3)`)
tk.MustQuery(`select * from t19141 order by c_int`).Sort().Check(testkit.Rows("1", "2", "3", "4"))
}

// Issue TiDB #51090.
func TestAlterTablePartitionRollback(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk3 := testkit.NewTestKit(t, store)
tk4 := testkit.NewTestKit(t, store)
tk5 := testkit.NewTestKit(t, store)
tk.MustExec(`use test;`)
tk2.MustExec(`use test;`)
tk3.MustExec(`use test;`)
tk4.MustExec(`use test;`)
tk5.MustExec(`use test;`)
tk.MustExec(`create table t(a int);`)
tk.MustExec(`insert into t values(1), (2), (3);`)

alterChan := make(chan error)
alterPartition := func() {
err := tk4.ExecToErr(`alter table t partition by hash(a) partitions 3;`)
alterChan <- err
}
waitFor := func(s string) {
for {
select {
case alterErr := <-alterChan:
require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr)
default:
// Alter still running
}
res := tk5.MustQuery(`admin show ddl jobs where db_name = 'test' and table_name = 't' and job_type = 'alter table partition by'`).Rows()
if len(res) > 0 && res[0][4] == s {
logutil.BgLogger().Info("Got state", zap.String("State", s))
break
}
gotime.Sleep(10 * gotime.Millisecond)
}
dom := domain.GetDomain(tk5.Session())
// Make sure the table schema is the new schema.
require.NoError(t, dom.Reload())
}

testFunc := func(states []string) {
for i, s := range states {
if i%2 == 0 {
tk2.MustExec(`begin;`)
tk2.MustExec(`select 1 from t;`)
if i > 0 {
tk3.MustExec(`commit;`)
}
} else {
tk3.MustExec(`begin;`)
tk3.MustExec(`select 1 from t;`)
tk2.MustExec(`commit;`)
}
if i == 0 {
go alterPartition()
}
waitFor(s)
if i == len(states)-1 {
break
}
}
res := tk.MustQuery(`admin show ddl jobs where table_name = 't' and job_type = 'alter table partition by'`).Rows()
tk.MustExec(fmt.Sprintf("admin cancel ddl jobs %v", res[0][0]))
tk2.MustExec(`commit;`)
tk3.MustExec(`commit;`)
require.ErrorContains(t, <-alterChan, "[ddl:8214]Cancelled DDL job")
tk.MustQuery(`show create table t;`).Check(testkit.Rows(
"t CREATE TABLE `t` (\n" +
" `a` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustQuery(`select a from t order by a;`).Check(testkit.Rows("1", "2", "3"))
}

states := []string{"delete only", "write only", "write reorganization", "delete reorganization"}
for i := range states {
testFunc(states[:i+1])
}
}