From 20e467a0469af63cf253be2e8fccfcd8e8851c5f Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 20 May 2021 03:29:41 +0800 Subject: [PATCH] executor: add some test cases about dynamic-mode and apply operator (#24683) --- executor/partition_table_test.go | 144 ++++++++++++++++++++++++++++++- 1 file changed, 142 insertions(+), 2 deletions(-) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 101f429475748..446e689184086 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -240,8 +240,8 @@ func (s *partitionTableSuite) TestOrderByandLimit(c *C) { // range partition table tk.MustExec(`create table trange(a int, b int, index idx_a(a)) partition by range(a) ( - partition p0 values less than(300), - partition p1 values less than (500), + partition p0 values less than(300), + partition p1 values less than (500), partition p2 values less than(1100));`) // hash partition table @@ -1298,6 +1298,146 @@ func (s *partitionTableSuite) TestSplitRegion(c *C) { tk.MustPartition(`select * from thash where a in (1, 10001, 20001)`, "p1").Sort().Check(result) } +func (s *partitionTableSuite) TestParallelApply(c *C) { + if israce.RaceEnabled { + c.Skip("exhaustive types test, skip race test") + } + + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create database test_parallel_apply") + tk.MustExec("use test_parallel_apply") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec("set tidb_enable_parallel_apply=true") + + tk.MustExec(`create table touter (a int, b int)`) + tk.MustExec(`create table tinner (a int, b int, key(a))`) + tk.MustExec(`create table thash (a int, b int, key(a)) partition by hash(a) partitions 4`) + tk.MustExec(`create table trange (a int, b int, key(a)) partition by range(a) ( + partition p0 values less than(10000), + partition p1 values less than(20000), + partition p2 values less than(30000), + partition p3 values less than(40000))`) + + vouter := make([]string, 0, 100) + for i := 0; i < 100; i++ { + vouter = append(vouter, fmt.Sprintf("(%v, %v)", rand.Intn(40000), rand.Intn(40000))) + } + tk.MustExec("insert into touter values " + strings.Join(vouter, ", ")) + + vals := make([]string, 0, 2000) + for i := 0; i < 100; i++ { + vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(40000), rand.Intn(40000))) + } + tk.MustExec("insert into tinner values " + strings.Join(vals, ", ")) + tk.MustExec("insert into thash values " + strings.Join(vals, ", ")) + tk.MustExec("insert into trange values " + strings.Join(vals, ", ")) + + // parallel apply + hash partition + IndexReader as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(thash.a) from thash use index(a) where thash.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─IndexReader 1.00 root partition:all index:StreamAgg`, // IndexReader is a inner child of Apply + ` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.thash.a)->Column#9`, + ` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.thash.a, test_parallel_apply.touter.b)`, + ` └─IndexFullScan 10000.00 cop[tikv] table:thash, index:a(a) keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(thash.a) from thash use index(a) where thash.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.a) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // parallel apply + hash partition + TableReader as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(thash.b) from thash ignore index(a) where thash.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─TableReader 1.00 root partition:all data:StreamAgg`, // TableReader is a inner child of Apply + ` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.thash.b)->Column#9`, + ` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.thash.a, test_parallel_apply.touter.b)`, + ` └─TableFullScan 10000.00 cop[tikv] table:thash keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(thash.b) from thash ignore index(a) where thash.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner ignore index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // parallel apply + hash partition + IndexLookUp as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─HashAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─IndexLookUp 1.00 root `, // IndexLookUp is a inner child of Apply + ` ├─Selection(Build) 8000.00 cop[tikv] gt(test_parallel_apply.tinner.a, test_parallel_apply.touter.b)`, + ` │ └─IndexFullScan 10000.00 cop[tikv] table:tinner, index:a(a) keep order:false, stats:pseudo`, + ` └─HashAgg(Probe) 1.00 cop[tikv] funcs:sum(test_parallel_apply.tinner.b)->Column#9`, + ` └─TableRowIDScan 8000.00 cop[tikv] table:tinner keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(thash.b) from thash use index(a) where thash.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // parallel apply + range partition + IndexReader as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(trange.a) from trange use index(a) where trange.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─IndexReader 1.00 root partition:all index:StreamAgg`, // IndexReader is a inner child of Apply + ` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.trange.a)->Column#9`, + ` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.trange.a, test_parallel_apply.touter.b)`, + ` └─IndexFullScan 10000.00 cop[tikv] table:trange, index:a(a) keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(trange.a) from trange use index(a) where trange.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.a) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // parallel apply + range partition + TableReader as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(trange.b) from trange ignore index(a) where trange.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─StreamAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─TableReader 1.00 root partition:all data:StreamAgg`, // TableReader is a inner child of Apply + ` └─StreamAgg 1.00 cop[tikv] funcs:sum(test_parallel_apply.trange.b)->Column#9`, + ` └─Selection 8000.00 cop[tikv] gt(test_parallel_apply.trange.a, test_parallel_apply.touter.b)`, + ` └─TableFullScan 10000.00 cop[tikv] table:trange keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(trange.b) from trange ignore index(a) where trange.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner ignore index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // parallel apply + range partition + IndexLookUp as its inner child + tk.MustQuery(`explain format='brief' select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Check(testkit.Rows( + `Projection 10000.00 root test_parallel_apply.touter.a, test_parallel_apply.touter.b`, + `└─Apply 10000.00 root CARTESIAN inner join, other cond:gt(cast(test_parallel_apply.touter.a, decimal(20,0) BINARY), Column#7)`, + ` ├─TableReader(Build) 10000.00 root data:TableFullScan`, + ` │ └─TableFullScan 10000.00 cop[tikv] table:touter keep order:false, stats:pseudo`, + ` └─HashAgg(Probe) 1.00 root funcs:sum(Column#9)->Column#7`, + ` └─IndexLookUp 1.00 root `, // IndexLookUp is a inner child of Apply + ` ├─Selection(Build) 8000.00 cop[tikv] gt(test_parallel_apply.tinner.a, test_parallel_apply.touter.b)`, + ` │ └─IndexFullScan 10000.00 cop[tikv] table:tinner, index:a(a) keep order:false, stats:pseudo`, + ` └─HashAgg(Probe) 1.00 cop[tikv] funcs:sum(test_parallel_apply.tinner.b)->Column#9`, + ` └─TableRowIDScan 8000.00 cop[tikv] table:tinner keep order:false, stats:pseudo`)) + tk.MustQuery(`select * from touter where touter.a > (select sum(trange.b) from trange use index(a) where trange.a>touter.b)`).Sort().Check( + tk.MustQuery(`select * from touter where touter.a > (select sum(tinner.b) from tinner use index(a) where tinner.a>touter.b)`).Sort().Rows()) + + // random queries + ops := []string{"!=", ">", "<", ">=", "<="} + aggFuncs := []string{"sum", "count", "max", "min"} + tbls := []string{"tinner", "thash", "trange"} + for i := 0; i < 50; i++ { + var r [][]interface{} + op := ops[rand.Intn(len(ops))] + agg := aggFuncs[rand.Intn(len(aggFuncs))] + x := rand.Intn(10000) + for _, tbl := range tbls { + q := fmt.Sprintf(`select * from touter where touter.a > (select %v(%v.b) from %v where %v.a%vtouter.b-%v)`, agg, tbl, tbl, tbl, op, x) + if r == nil { + r = tk.MustQuery(q).Sort().Rows() + } else { + tk.MustQuery(q).Sort().Check(r) + } + } + } +} + func (s *partitionTableSuite) TestDirectReadingWithAgg(c *C) { if israce.RaceEnabled { c.Skip("exhaustive types test, skip race test")