Skip to content

Commit

Permalink
ddl: fix rollback reorganize partition left intermediate state (#51631)…
Browse files Browse the repository at this point in the history
… (#53469)

close #51090
  • Loading branch information
ti-chi-bot authored May 22, 2024
1 parent d4ec23a commit 99478c8
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 14 deletions.
21 changes: 10 additions & 11 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,18 +1961,20 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
job.State = model.JobStateCancelled
return ver, err
}
if partInfo.DDLType != model.PartitionTypeNone {
// ALTER TABLE ... PARTITION BY
if partInfo.Type != model.PartitionTypeNone {
// Also remove anything with the new table id
physicalTableIDs = append(physicalTableIDs, tblInfo.Partition.NewTableID)
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
// Reset if it was normal table before
if tblInfo.Partition.Type == model.PartitionTypeNone {
if tblInfo.Partition.Type == model.PartitionTypeNone ||
tblInfo.Partition.DDLType == model.PartitionTypeNone {
tblInfo.Partition = nil
} else {
tblInfo.Partition.NewTableID = 0
tblInfo.Partition.DDLExpr = ""
tblInfo.Partition.DDLColumns = nil
tblInfo.Partition.DDLType = model.PartitionTypeNone
tblInfo.Partition.ClearReorgIntermediateInfo()
}
} else {
// REMOVE PARTITIONING
tblInfo.Partition.ClearReorgIntermediateInfo()
}

ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
Expand Down Expand Up @@ -3017,10 +3019,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
tblInfo.Partition = nil
} else {
// ALTER TABLE ... PARTITION BY
tblInfo.Partition.DDLType = model.PartitionTypeNone
tblInfo.Partition.DDLExpr = ""
tblInfo.Partition.DDLColumns = nil
tblInfo.Partition.NewTableID = 0
tblInfo.Partition.ClearReorgIntermediateInfo()
}
err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs)
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,19 @@ func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model
for _, pd := range addingDefinitions {
partNames = append(partNames, pd.Name.L)
}
job.Args = []interface{}{partNames}
if job.Type == model.ActionReorganizePartition ||
job.Type == model.ActionAlterTablePartitioning ||
job.Type == model.ActionRemovePartitioning {
partInfo := &model.PartitionInfo{}
var pNames []string
err = job.DecodeArgs(&pNames, &partInfo)
if err != nil {
return ver, err
}
job.Args = []any{partNames, partInfo}
} else {
job.Args = []any{partNames}
}
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/parser/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//pkg/parser/charset",
"//pkg/parser/mysql",
Expand Down
8 changes: 8 additions & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,14 @@ func (pi *PartitionInfo) HasTruncatingPartitionID(pid int64) bool {
return false
}

// ClearReorgIntermediateInfo remove intermediate information used during reorganize partition.
func (pi *PartitionInfo) ClearReorgIntermediateInfo() {
pi.DDLType = PartitionTypeNone
pi.DDLExpr = ""
pi.DDLColumns = nil
pi.NewTableID = 0
}

// PartitionState is the state of the partition.
type PartitionState struct {
ID int64 `json:"id"`
Expand Down
13 changes: 13 additions & 0 deletions pkg/parser/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,3 +818,16 @@ func TestTTLJobInterval(t *testing.T) {
require.NoError(t, err)
require.Equal(t, time.Hour*200, interval)
}

func TestClearReorgIntermediateInfo(t *testing.T) {
ptInfo := &PartitionInfo{}
ptInfo.DDLType = PartitionTypeHash
ptInfo.DDLExpr = "Test DDL Expr"
ptInfo.NewTableID = 1111

ptInfo.ClearReorgIntermediateInfo()
require.Equal(t, PartitionTypeNone, ptInfo.DDLType)
require.Equal(t, "", ptInfo.DDLExpr)
require.Equal(t, true, ptInfo.DDLColumns == nil)
require.Equal(t, int64(0), ptInfo.NewTableID)
}
2 changes: 1 addition & 1 deletion pkg/table/tables/test/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"partition_test.go",
],
flaky = True,
shard_count = 18,
shard_count = 19,
deps = [
"//pkg/ddl",
"//pkg/domain",
Expand Down
81 changes: 81 additions & 0 deletions pkg/table/tables/test/partition/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3057,3 +3057,84 @@ func TestPointGetKeyPartitioning(t *testing.T) {
tk.MustExec(`INSERT INTO t VALUES ('Aa', 'Ab', 'Ac'), ('Ba', 'Bb', 'Bc')`)
tk.MustQuery(`SELECT * FROM t WHERE b = 'Ab'`).Check(testkit.Rows("Aa Ab Ac"))
}

// Issue TiDB #51090.
func TestAlterTablePartitionRollback(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk3 := testkit.NewTestKit(t, store)
tk4 := testkit.NewTestKit(t, store)
tk5 := testkit.NewTestKit(t, store)
tk.MustExec(`use test;`)
tk2.MustExec(`use test;`)
tk3.MustExec(`use test;`)
tk4.MustExec(`use test;`)
tk5.MustExec(`use test;`)
tk.MustExec(`create table t(a int);`)
tk.MustExec(`insert into t values(1), (2), (3);`)

alterChan := make(chan error)
alterPartition := func() {
err := tk4.ExecToErr(`alter table t partition by hash(a) partitions 3;`)
alterChan <- err
}
waitFor := func(s string) {
for {
select {
case alterErr := <-alterChan:
require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr)
default:
// Alter still running
}
res := tk5.MustQuery(`admin show ddl jobs where db_name = 'test' and table_name = 't' and job_type = 'alter table partition by'`).Rows()
if len(res) > 0 && res[0][4] == s {
logutil.BgLogger().Info("Got state", zap.String("State", s))
break
}
gotime.Sleep(10 * gotime.Millisecond)
}
dom := domain.GetDomain(tk5.Session())
// Make sure the table schema is the new schema.
require.NoError(t, dom.Reload())
}

testFunc := func(states []string) {
for i, s := range states {
if i%2 == 0 {
tk2.MustExec(`begin;`)
tk2.MustExec(`select 1 from t;`)
if i > 0 {
tk3.MustExec(`commit;`)
}
} else {
tk3.MustExec(`begin;`)
tk3.MustExec(`select 1 from t;`)
tk2.MustExec(`commit;`)
}
if i == 0 {
go alterPartition()
}
waitFor(s)
if i == len(states)-1 {
break
}
}
res := tk.MustQuery(`admin show ddl jobs where table_name = 't' and job_type = 'alter table partition by'`).Rows()
tk.MustExec(fmt.Sprintf("admin cancel ddl jobs %v", res[0][0]))
tk2.MustExec(`commit;`)
tk3.MustExec(`commit;`)
require.ErrorContains(t, <-alterChan, "[ddl:8214]Cancelled DDL job")
tk.MustQuery(`show create table t;`).Check(testkit.Rows(
"t CREATE TABLE `t` (\n" +
" `a` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustQuery(`select a from t order by a;`).Check(testkit.Rows("1", "2", "3"))
}

states := []string{"delete only", "write only", "write reorganization", "delete reorganization"}
for i := range states {
testFunc(states[:i+1])
}
}

0 comments on commit 99478c8

Please sign in to comment.