diff --git a/ddl/partition_test.go b/ddl/partition_test.go index c99a4356a1d55..dd4024770cf2d 100644 --- a/ddl/partition_test.go +++ b/ddl/partition_test.go @@ -18,8 +18,8 @@ import ( "testing" "time" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/internal/callback" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -144,11 +144,6 @@ func testTruncatePartition(t *testing.T, ctx sessionctx.Context, d ddl.DDL, dbIn func TestReorganizePartitionRollback(t *testing.T) { // See issue: https://github.com/pingcap/tidb/issues/42448 - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/reorgPartWriteReorgReplacedPartIDsFail", "return(true)")) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/reorgPartWriteReorgReplacedPartIDsFail")) - }() - store, do := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -168,8 +163,20 @@ func TestReorganizePartitionRollback(t *testing.T) { " PARTITION `p4` VALUES LESS THAN (10000000),\n" + " PARTITION `p5` VALUES LESS THAN (MAXVALUE))") tk.MustExec("insert into t1(k, c, pad) values (1, 'a', 'beijing'), (2, 'b', 'chengdu')") + + wait := make(chan struct{}) + defer close(wait) ddlDone := make(chan error) defer close(ddlDone) + hook := &callback.TestDDLCallback{Do: do} + hook.OnJobRunAfterExported = func(job *model.Job) { + if job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateWriteReorganization { + <-wait + <-wait + } + } + do.DDL().SetHook(hook) + go func() { tk2 := testkit.NewTestKit(t, store) tk2.MustExec("use test") @@ -177,30 +184,27 @@ func TestReorganizePartitionRollback(t *testing.T) { ddlDone <- err }() - showDDLJobStart := time.Now() jobID := "" - schemaState := "" - for time.Since(showDDLJobStart) < time.Minute { - rows := tk.MustQuery("admin show ddl jobs where JOB_TYPE='alter table reorganize partition'").Rows() - if len(rows) == 0 { - time.Sleep(100 * time.Millisecond) - continue - } + // wait DDL job reaches hook and then cancel + select { + case wait <- struct{}{}: + rows := tk.MustQuery("admin show ddl jobs where JOB_TYPE='alter table reorganize partition'").Rows() require.Equal(t, 1, len(rows)) jobID = rows[0][0].(string) - schemaState = rows[0][4].(string) - if schemaState != "write reorganization" { - time.Sleep(500 * time.Millisecond) - continue - } - break + tk.MustExec("admin cancel ddl jobs " + jobID) + case <-time.After(time.Minute): + require.FailNow(t, "timeout") } - require.NotEqual(t, "", jobID) - require.Equal(t, "write reorganization", schemaState) - tk.MustExec("admin cancel ddl jobs " + jobID) + // continue to run DDL + select { + case wait <- struct{}{}: + case <-time.After(time.Minute): + require.FailNow(t, "timeout") + } + // wait ddl done select { case err := <-ddlDone: require.Error(t, err)