Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Mar 23, 2023
1 parent 9b8fb14 commit 061851f
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions ddl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/internal/callback"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -144,11 +144,6 @@ func testTruncatePartition(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbIn

func TestReorganizePartitionRollback(t *testing.T) {
// See issue: https://github.com/pingcap/tidb/issues/42448
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/reorgPartWriteReorgReplacedPartIDsFail", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/reorgPartWriteReorgReplacedPartIDsFail"))
}()

store, do := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -168,39 +163,48 @@ func TestReorganizePartitionRollback(t *testing.T) {
" PARTITION `p4` VALUES LESS THAN (10000000),\n" +
" PARTITION `p5` VALUES LESS THAN (MAXVALUE))")
tk.MustExec("insert into t1(k, c, pad) values (1, 'a', 'beijing'), (2, 'b', 'chengdu')")

wait := make(chan struct{})
defer close(wait)
ddlDone := make(chan error)
defer close(ddlDone)
hook := &callback.TestDDLCallback{Do: do}
hook.OnJobRunAfterExported = func(job *model.Job) {
if job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateWriteReorganization {
<-wait
<-wait
}
}
do.DDL().SetHook(hook)

go func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
err := tk2.ExecToErr("alter table t1 reorganize partition p0, p1, p2, p3, p4 into( partition pnew values less than (10000000))")
ddlDone <- err
}()

showDDLJobStart := time.Now()
jobID := ""
schemaState := ""
for time.Since(showDDLJobStart) < time.Minute {
rows := tk.MustQuery("admin show ddl jobs where JOB_TYPE='alter table reorganize partition'").Rows()
if len(rows) == 0 {
time.Sleep(100 * time.Millisecond)
continue
}

// wait DDL job reaches hook and then cancel
select {
case wait <- struct{}{}:
rows := tk.MustQuery("admin show ddl jobs where JOB_TYPE='alter table reorganize partition'").Rows()
require.Equal(t, 1, len(rows))
jobID = rows[0][0].(string)
schemaState = rows[0][4].(string)
if schemaState != "write reorganization" {
time.Sleep(500 * time.Millisecond)
continue
}
break
tk.MustExec("admin cancel ddl jobs " + jobID)
case <-time.After(time.Minute):
require.FailNow(t, "timeout")
}

require.NotEqual(t, "", jobID)
require.Equal(t, "write reorganization", schemaState)
tk.MustExec("admin cancel ddl jobs " + jobID)
// continue to run DDL
select {
case wait <- struct{}{}:
case <-time.After(time.Minute):
require.FailNow(t, "timeout")
}

// wait ddl done
select {
case err := <-ddlDone:
require.Error(t, err)
Expand Down

0 comments on commit 061851f

Please sign in to comment.