Skip to content

Commit

Permalink
test: stabilize TestUpgradeWithPauseDDL (#54504)
Browse files Browse the repository at this point in the history
close #54452
  • Loading branch information
tangenta authored Jul 9, 2024
1 parent fa96096 commit d8098b9
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions pkg/session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,12 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
}

wg := sync.WaitGroup{}
execDDL := func(query string) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
_, err := tk.ExecWithContext(context.Background(), query)
require.NoError(t, err)
}
asyncExecDDL := func(query string) {
ch := make(chan struct{})
wg.Add(1)
Expand Down Expand Up @@ -729,12 +735,12 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
query2 = fmt.Sprintf("alter table test.pause_user_ddl_t add column c_%d int", tc.Cnt.Load())
case 1:
// Make sure case0 and case1 use different table ID. Then case1's table won't be filtered because they use the same table ID.
query1 = fmt.Sprintf("alter table test.pause_user_ddl_t1 add index idx_%d(a)", tc.Cnt.Load())
query2 = fmt.Sprintf("alter table mysql.pause_user_ddl_t1 add column c_%d int", tc.Cnt.Load())
query1 = fmt.Sprintf("alter table mysql.pause_user_ddl_t1 add column c_%d int", tc.Cnt.Load())
query2 = fmt.Sprintf("alter table test.pause_user_ddl_t1 add index idx_%d(a)", tc.Cnt.Load())
}
tc.Cnt.Add(1)
asyncExecDDL(query1)
asyncExecDDL(query2)
execDDL(query1) // System table DDLs should not be blocked.
asyncExecDDL(query2) // User table DDLs should be blocked.

checkDDLJobState(s)
}
Expand Down Expand Up @@ -790,13 +796,14 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
require.Equal(t, int64(0), rows[0].GetInt64(0))

// Check user DDLs are handled after system DDLs.
sql = fmt.Sprintf("select job_meta from mysql.tidb_ddl_history order by job_id desc limit %d", tc.Cnt.Load())
sql = fmt.Sprintf("select job_meta from mysql.tidb_ddl_history order by job_id desc limit %d", tc.Cnt.Load()*2)
rows, err = execute(context.Background(), tk.Session(), sql)
require.NoError(t, err)
require.Len(t, rows, int(tc.Cnt.Load()))
require.Len(t, rows, int(tc.Cnt.Load()*2))
type info struct {
ts uint64
sql string
ts uint64
jobID int64
sql string
}
mysqlOpInfos := make([]info, 0, len(rows))
testOpInfos := make([]info, 0, len(rows))
Expand All @@ -806,15 +813,15 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
err := runJob.Decode(jobBinary)
require.NoError(t, err)
if strings.EqualFold(runJob.SchemaName, "mysql") {
mysqlOpInfos = append(mysqlOpInfos, info{runJob.BinlogInfo.FinishedTS, runJob.Query})
mysqlOpInfos = append(mysqlOpInfos, info{runJob.BinlogInfo.FinishedTS, runJob.ID, runJob.Query})
} else {
testOpInfos = append(testOpInfos, info{runJob.BinlogInfo.FinishedTS, runJob.Query})
testOpInfos = append(testOpInfos, info{runJob.BinlogInfo.FinishedTS, runJob.ID, runJob.Query})
}
}
for _, mysqlInfo := range mysqlOpInfos {
for _, testInfo := range testOpInfos {
cmt := fmt.Sprintf("test sql:%s, ts:%v, mysql sql:%s, ts:%v",
testInfo.sql, testInfo.ts, mysqlInfo.sql, mysqlInfo.ts)
cmt := fmt.Sprintf("test sql:%s, ts:%v, jobID: %d, mysql sql:%s, ts:%v, jobID: %d",
testInfo.sql, testInfo.ts, testInfo.jobID, mysqlInfo.sql, mysqlInfo.ts, mysqlInfo.jobID)
require.Greater(t, testInfo.ts, mysqlInfo.ts, cmt)
}
}
Expand Down

0 comments on commit d8098b9

Please sign in to comment.