Skip to content

Commit

Permalink
planner, CTE, view: Fix default inline CTE which contains orderby/lim…
Browse files Browse the repository at this point in the history
…it/distinct and inside of view (#56609)

close #56582, close #56603
  • Loading branch information
elsa0520 authored Oct 15, 2024
1 parent 83c4a0f commit fa723c3
Show file tree
Hide file tree
Showing 14 changed files with 307 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@
{
"Depth": 2,
"Label": 0,
"IsRoot": false,
"StoreType": 0,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "│ │ ",
Expand All @@ -232,15 +232,80 @@
{
"Depth": 2,
"Label": 0,
"IsRoot": false,
"StoreType": 0,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
}
],
"CTEs": null
"CTEs": [
[
{
"Depth": 0,
"Label": 0,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
},
{
"Depth": 1,
"Label": 3,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
},
{
"Depth": 2,
"Label": 0,
"IsRoot": false,
"StoreType": 0,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
}
],
[
{
"Depth": 0,
"Label": 0,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
},
{
"Depth": 1,
"Label": 3,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
},
{
"Depth": 2,
"Label": 0,
"IsRoot": false,
"StoreType": 0,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
}
]
]
},
{
"SQL": "WITH RECURSIVE cte (n) AS( SELECT 1 UNION ALL SELECT n + 1 FROM cte WHERE n < 5)SELECT * FROM cte;",
Expand Down
24 changes: 15 additions & 9 deletions pkg/planner/core/casetest/hint/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -796,18 +796,24 @@
{
"SQL": "explain format = 'brief' select /*+ qb_name(qb_v8, v8), merge(@qb_v8) */ * from v8;",
"Plan": [
"HashAgg 16000.00 root group by:Column#41, funcs:firstrow(Column#41)->Column#41",
"HashAgg 16000.00 root group by:Column#21, funcs:firstrow(Column#21)->Column#21",
"└─Union 1000000010000.00 root ",
" ├─HashJoin 1000000000000.00 root CARTESIAN inner join",
" │ ├─IndexReader(Build) 10000.00 root index:IndexFullScan",
" │ │ └─IndexFullScan 10000.00 cop[tikv] table:t3, index:idx_a(a) keep order:false, stats:pseudo",
" │ └─HashJoin(Probe) 100000000.00 root CARTESIAN inner join",
" │ ├─IndexReader(Build) 10000.00 root index:IndexFullScan",
" │ │ └─IndexFullScan 10000.00 cop[tikv] table:t2, index:idx_a(a) keep order:false, stats:pseudo",
" │ └─TableReader(Probe) 10000.00 root data:TableFullScan",
" │ └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo",
" │ ├─TableReader(Build) 10000.00 root data:TableFullScan",
" │ │ └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo",
" │ └─CTEFullScan(Probe) 100000000.00 root CTE:cte2 data:CTE_1",
" └─TableReader 10000.00 root data:TableFullScan",
" └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"
" └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo",
"CTE_1 100000000.00 root Non-Recursive CTE",
"└─HashJoin(Seed Part) 100000000.00 root CARTESIAN inner join",
" ├─CTEFullScan(Build) 10000.00 root CTE:cte4 data:CTE_3",
" └─CTEFullScan(Probe) 10000.00 root CTE:cte3 data:CTE_2",
"CTE_3 10000.00 root Non-Recursive CTE",
"└─IndexReader(Seed Part) 10000.00 root index:IndexFullScan",
" └─IndexFullScan 10000.00 cop[tikv] table:t3, index:idx_a(a) keep order:false, stats:pseudo",
"CTE_2 10000.00 root Non-Recursive CTE",
"└─IndexReader(Seed Part) 10000.00 root index:IndexFullScan",
" └─IndexFullScan 10000.00 cop[tikv] table:t2, index:idx_a(a) keep order:false, stats:pseudo"
],
"Warn": null
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/planstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
data = glob(["testdata/**"]),
flaky = True,
shard_count = 5,
shard_count = 6,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
55 changes: 41 additions & 14 deletions pkg/planner/core/casetest/planstats/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,6 @@ func TestPlanStatsLoad(t *testing.T) {
require.Greater(t, countFullStats(ptr.StatsInfo().HistColl, tableInfo.Columns[2].ID), 0)
},
},
{ // CTE
sql: "with cte(x, y) as (select d + 1, b from t where c > 1) select * from cte where x < 3",
check: func(p base.Plan, tableInfo *model.TableInfo) {
ps, ok := p.(*plannercore.PhysicalProjection)
require.True(t, ok)
pc, ok := ps.Children()[0].(*plannercore.PhysicalTableReader)
require.True(t, ok)
pp, ok := pc.GetTablePlan().(*plannercore.PhysicalSelection)
require.True(t, ok)
reader, ok := pp.Children()[0].(*plannercore.PhysicalTableScan)
require.True(t, ok)
require.Greater(t, countFullStats(reader.StatsInfo().HistColl, tableInfo.Columns[2].ID), 0)
},
},
{ // recursive CTE
sql: "with recursive cte(x, y) as (select a, b from t where c > 1 union select x + 1, y from cte where x < 5) select * from cte",
check: func(p base.Plan, tableInfo *model.TableInfo) {
Expand Down Expand Up @@ -225,6 +211,47 @@ func TestPlanStatsLoad(t *testing.T) {
}
}

func TestPlanStatsLoadForCTE(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("set @@session.tidb_analyze_version=2")
tk.MustExec("set @@session.tidb_partition_prune_mode = 'static'")
tk.MustExec("set @@session.tidb_stats_load_sync_wait = 60000")
tk.MustExec("set tidb_opt_projection_push_down = 0")
tk.MustExec("create table t(a int, b int, c int, d int, primary key(a), key idx(b))")
tk.MustExec("insert into t values (1,1,1,1),(2,2,2,2),(3,3,3,3)")
tk.MustExec("create table pt(a int, b int, c int) partition by range(a) (partition p0 values less than (10), partition p1 values less than (20), partition p2 values less than maxvalue)")
tk.MustExec("insert into pt values (1,1,1),(2,2,2),(13,13,13),(14,14,14),(25,25,25),(36,36,36)")

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
tk.MustExec("analyze table t all columns")
tk.MustExec("analyze table pt all columns")

var (
input []string
output []struct {
Query string
Result []string
}
)
testData := GetPlanStatsData()
testData.LoadTestCases(t, &input, &output)
for i, sql := range input {
testdata.OnRecord(func() {
output[i].Query = input[i]
output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(sql).Rows())
})
tk.MustQuery(sql).Check(testkit.Rows(output[i].Result...))
}
}

func countFullStats(stats *statistics.HistColl, colID int64) int {
cnt := -1
stats.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,11 @@
"explain format = brief select * from t join tp where tp.a = 10 and t.b = tp.c",
"explain format = brief select * from t join tp partition (p0) join t2 where t.a < 10 and t.b = tp.c and t2.a > 10 and t2.a = tp.c"
]
},
{
"name": "TestPlanStatsLoadForCTE",
"cases": [
"explain format= brief with cte(x, y) as (select d + 1, b from t where c > 1) select * from cte where x < 3"
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,19 @@
]
}
]
},
{
"Name": "TestPlanStatsLoadForCTE",
"Cases": [
{
"Query": "explain format= brief with cte(x, y) as (select d + 1, b from t where c > 1) select * from cte where x < 3",
"Result": [
"Projection 1.60 root plus(test.t.d, 1)->Column#12, test.t.b",
"└─TableReader 1.60 root data:Selection",
" └─Selection 1.60 cop[tikv] gt(test.t.c, 1), lt(plus(test.t.d, 1), 3)",
" └─TableFullScan 3.00 cop[tikv] table:t keep order:false"
]
}
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"select * from ((t1 left join t2 on a1=a2) left join t3 on b2=b3) join t4 on b3=b4 -- nested and propagation of null filtering",
"select * from t1 right join t2 on a1=a2 where exists (select 1 from t3 where b1=b3) -- semi join is null filtering on the outer join",
"select sum(l_extendedprice) / 7.0 as avg_yearly from lineitem, part where p_partkey = l_partkey and p_brand = 'Brand#44' and p_container = 'WRAP PKG' and l_quantity < ( select 0.2 * avg(l_quantity) from lineitem where l_partkey = p_partkey) -- Q17 in TPCH. null filter on derived outer join",
"WITH cte AS ( SELECT alias1.col_date AS field1 FROM d AS alias1 LEFT JOIN dd AS alias2 ON alias1.col_blob_key=alias2.col_blob_key WHERE alias1.col_varchar_key IS NULL OR alias1.col_blob_key >= 'a') DELETE FROM outr1.*, outr2.* USING d AS outr1 LEFT OUTER JOIN dd AS outr2 ON (outr1.col_date=outr2.col_date) JOIN cte AS outrcte ON outr2.col_blob_key=outrcte.field1 -- nested complex case",
"WITH cte AS ( SELECT alias1.col_date AS field1 FROM d AS alias1 LEFT JOIN dd AS alias2 ON alias1.col_blob_key=alias2.col_blob_key WHERE alias1.col_varchar_key IS NULL OR alias1.col_blob_key >= 'a') SELECT * FROM d AS outr1 LEFT OUTER JOIN dd AS outr2 ON (outr1.col_date=outr2.col_date) JOIN cte AS outrcte ON outr2.col_blob_key=outrcte.field1 -- nested complex case",
"with cte as (select count(a2) as cnt,b2-5 as b3 from t1 left outer join t2 on a1=a2 group by b3) select * from cte where b3 > 1 -- aggregate case.",
"select * from dd as outr1 WHERE outr1.col_blob IN (SELECT DISTINCT innr1.col_blob_key AS y FROM d AS innrcte left outer join dd AS innr1 ON innr1.pk = innrcte.col_date WHERE outr1.col_int_key > 6)",
"select * from t0 left outer join t11 on a0=a1 where t0.b0 in (t11.b1, t11.c1) -- each = in the in list is null filtering",
Expand Down
39 changes: 19 additions & 20 deletions pkg/planner/core/casetest/rule/testdata/outer2inner_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -279,26 +279,25 @@
]
},
{
"SQL": "WITH cte AS ( SELECT alias1.col_date AS field1 FROM d AS alias1 LEFT JOIN dd AS alias2 ON alias1.col_blob_key=alias2.col_blob_key WHERE alias1.col_varchar_key IS NULL OR alias1.col_blob_key >= 'a') DELETE FROM outr1.*, outr2.* USING d AS outr1 LEFT OUTER JOIN dd AS outr2 ON (outr1.col_date=outr2.col_date) JOIN cte AS outrcte ON outr2.col_blob_key=outrcte.field1 -- nested complex case",
"Plan": [
"Delete N/A root N/A",
"└─Projection 6523.44 root test.d._tidb_rowid, test.dd._tidb_rowid, test.d.col_date",
" └─HashJoin 6523.44 root inner join, equal:[eq(test.d.col_date, Column#41)]",
" ├─HashJoin(Build) 4175.00 root left outer join, equal:[eq(test.d.col_blob_key, test.dd.col_blob_key)]",
" │ ├─TableReader(Build) 3340.00 root data:Selection",
" │ │ └─Selection 3340.00 cop[tikv] or(isnull(test.d.col_varchar_key), ge(test.d.col_blob_key, \"a\"))",
" │ │ └─TableFullScan 10000.00 cop[tikv] table:alias1 keep order:false, stats:pseudo",
" │ └─TableReader(Probe) 9990.00 root data:Selection",
" │ └─Selection 9990.00 cop[tikv] not(isnull(test.dd.col_blob_key))",
" │ └─TableFullScan 10000.00 cop[tikv] table:alias2 keep order:false, stats:pseudo",
" └─Projection(Probe) 12487.50 root test.d._tidb_rowid, test.dd._tidb_rowid, cast(test.dd.col_blob_key, datetime(6) BINARY)->Column#41",
" └─HashJoin 12487.50 root inner join, equal:[eq(test.d.col_date, test.dd.col_date)]",
" ├─TableReader(Build) 9990.00 root data:Selection",
" │ └─Selection 9990.00 cop[tikv] not(isnull(test.d.col_date))",
" │ └─TableFullScan 10000.00 cop[tikv] table:outr1 keep order:false, stats:pseudo",
" └─TableReader(Probe) 9990.00 root data:Selection",
" └─Selection 9990.00 cop[tikv] not(isnull(test.dd.col_date))",
" └─TableFullScan 10000.00 cop[tikv] table:outr2 keep order:false, stats:pseudo"
"SQL": "WITH cte AS ( SELECT alias1.col_date AS field1 FROM d AS alias1 LEFT JOIN dd AS alias2 ON alias1.col_blob_key=alias2.col_blob_key WHERE alias1.col_varchar_key IS NULL OR alias1.col_blob_key >= 'a') SELECT * FROM d AS outr1 LEFT OUTER JOIN dd AS outr2 ON (outr1.col_date=outr2.col_date) JOIN cte AS outrcte ON outr2.col_blob_key=outrcte.field1 -- nested complex case",
"Plan": [
"Projection 6523.44 root test.d.pk, test.d.col_blob, test.d.col_blob_key, test.d.col_varchar_key, test.d.col_date, test.d.col_int_key, test.dd.pk, test.dd.col_blob, test.dd.col_blob_key, test.dd.col_date, test.dd.col_int_key, test.d.col_date",
"└─HashJoin 6523.44 root inner join, equal:[eq(test.d.col_date, Column#41)]",
" ├─HashJoin(Build) 4175.00 root left outer join, equal:[eq(test.d.col_blob_key, test.dd.col_blob_key)]",
" │ ├─TableReader(Build) 3340.00 root data:Selection",
" │ │ └─Selection 3340.00 cop[tikv] or(isnull(test.d.col_varchar_key), ge(test.d.col_blob_key, \"a\"))",
" │ │ └─TableFullScan 10000.00 cop[tikv] table:alias1 keep order:false, stats:pseudo",
" │ └─TableReader(Probe) 9990.00 root data:Selection",
" │ └─Selection 9990.00 cop[tikv] not(isnull(test.dd.col_blob_key))",
" │ └─TableFullScan 10000.00 cop[tikv] table:alias2 keep order:false, stats:pseudo",
" └─Projection(Probe) 12487.50 root test.d.pk, test.d.col_blob, test.d.col_blob_key, test.d.col_varchar_key, test.d.col_date, test.d.col_int_key, test.dd.pk, test.dd.col_blob, test.dd.col_blob_key, test.dd.col_date, test.dd.col_int_key, cast(test.dd.col_blob_key, datetime(6) BINARY)->Column#41",
" └─HashJoin 12487.50 root inner join, equal:[eq(test.d.col_date, test.dd.col_date)]",
" ├─TableReader(Build) 9990.00 root data:Selection",
" │ └─Selection 9990.00 cop[tikv] not(isnull(test.dd.col_date))",
" │ └─TableFullScan 10000.00 cop[tikv] table:outr2 keep order:false, stats:pseudo",
" └─TableReader(Probe) 9990.00 root data:Selection",
" └─Selection 9990.00 cop[tikv] not(isnull(test.d.col_date))",
" └─TableFullScan 10000.00 cop[tikv] table:outr1 keep order:false, stats:pseudo"
]
},
{
Expand Down
33 changes: 27 additions & 6 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p base.LogicalPlan,
}
// flag it if cte contain aggregation
if b.buildingCTE {
b.outerCTEs[len(b.outerCTEs)-1].containAggOrWindow = true
b.outerCTEs[len(b.outerCTEs)-1].containRecursiveForbiddenOperator = true
}
var rollupExpand *logicalop.LogicalExpand
if expand, ok := p.(*logicalop.LogicalExpand); ok {
Expand Down Expand Up @@ -1496,6 +1496,10 @@ func (b *PlanBuilder) buildProjection(ctx context.Context, p base.LogicalPlan, f
func (b *PlanBuilder) buildDistinct(child base.LogicalPlan, length int) (*logicalop.LogicalAggregation, error) {
b.optFlag = b.optFlag | rule.FlagBuildKeyInfo
b.optFlag = b.optFlag | rule.FlagPushDownAgg
// flag it if cte contain distinct
if b.buildingCTE {
b.outerCTEs[len(b.outerCTEs)-1].containRecursiveForbiddenOperator = true
}
plan4Agg := logicalop.LogicalAggregation{
AggFuncs: make([]*aggregation.AggFuncDesc, 0, child.Schema().Len()),
GroupByItems: expression.Column2Exprs(child.Schema().Clone().Columns[:length]),
Expand Down Expand Up @@ -2091,6 +2095,10 @@ func extractLimitCountOffset(ctx expression.BuildContext, limit *ast.Limit) (cou

func (b *PlanBuilder) buildLimit(src base.LogicalPlan, limit *ast.Limit) (base.LogicalPlan, error) {
b.optFlag = b.optFlag | rule.FlagPushDownTopN
// flag it if cte contain limit
if b.buildingCTE {
b.outerCTEs[len(b.outerCTEs)-1].containRecursiveForbiddenOperator = true
}
var (
offset, count uint64
err error
Expand Down Expand Up @@ -3921,6 +3929,10 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p b
}

if sel.OrderBy != nil {
// flag it if cte contain order by
if b.buildingCTE {
b.outerCTEs[len(b.outerCTEs)-1].containRecursiveForbiddenOperator = true
}
// We need to keep the ORDER BY clause for the following cases:
// 1. The select is top level query, order should be honored
// 2. The query has LIMIT clause
Expand Down Expand Up @@ -4227,9 +4239,9 @@ func (b *PlanBuilder) tryBuildCTE(ctx context.Context, tn *ast.TableName, asName
prevSchema := cte.seedLP.Schema().Clone()
lp.SetSchema(getResultCTESchema(cte.seedLP.Schema(), b.ctx.GetSessionVars()))

// If current CTE query contain another CTE which 'containAggOrWindow' is true, current CTE 'containAggOrWindow' will be true
// If current CTE query contain another CTE which 'containRecursiveForbiddenOperator' is true, current CTE 'containRecursiveForbiddenOperator' will be true
if b.buildingCTE {
b.outerCTEs[len(b.outerCTEs)-1].containAggOrWindow = cte.containAggOrWindow || b.outerCTEs[len(b.outerCTEs)-1].containAggOrWindow
b.outerCTEs[len(b.outerCTEs)-1].containRecursiveForbiddenOperator = cte.containRecursiveForbiddenOperator || b.outerCTEs[len(b.outerCTEs)-1].containRecursiveForbiddenOperator
}
// Compute cte inline
b.computeCTEInlineFlag(cte)
Expand Down Expand Up @@ -4287,13 +4299,22 @@ func (b *PlanBuilder) computeCTEInlineFlag(cte *cteInfo) {
b.ctx.GetSessionVars().StmtCtx.SetHintWarning(
fmt.Sprintf("Recursive CTE %s can not be inlined by merge() or tidb_opt_force_inline_cte.", cte.def.Name))
}
} else if cte.containAggOrWindow && b.buildingRecursivePartForCTE {
cte.isInline = false
} else if cte.containRecursiveForbiddenOperator && b.buildingRecursivePartForCTE {
if cte.forceInlineByHintOrVar {
b.ctx.GetSessionVars().StmtCtx.AppendWarning(plannererrors.ErrCTERecursiveForbidsAggregation.FastGenByArgs(cte.def.Name))
}
} else if cte.consumerCount > 1 {
cte.isInline = false
} else if cte.consumerCount != 1 {
// If hint or session variable is set, it can be inlined by user.
if cte.forceInlineByHintOrVar {
cte.isInline = true
} else {
// Consumer count > 1 or = 0, CTE can not be inlined by default.
// Case the consumer count = 0 (issue #56582)
// It means that CTE maybe inside of view and the UpdateCTEConsumerCount(preprocess phase) is skipped
// So all of CTE.consumerCount is not updated, and we can not use it to determine whether CTE can be inlined.
cte.isInline = false
}
} else {
cte.isInline = true
Expand Down Expand Up @@ -6455,7 +6476,7 @@ func sortWindowSpecs(groupedFuncs map[*ast.WindowSpec][]*ast.WindowFuncExpr, ord

func (b *PlanBuilder) buildWindowFunctions(ctx context.Context, p base.LogicalPlan, groupedFuncs map[*ast.WindowSpec][]*ast.WindowFuncExpr, orderedSpec []*ast.WindowSpec, aggMap map[*ast.AggregateFuncExpr]int) (base.LogicalPlan, map[*ast.WindowFuncExpr]int, error) {
if b.buildingCTE {
b.outerCTEs[len(b.outerCTEs)-1].containAggOrWindow = true
b.outerCTEs[len(b.outerCTEs)-1].containRecursiveForbiddenOperator = true
}
args := make([]ast.ExprNode, 0, 4)
windowMap := make(map[*ast.WindowFuncExpr]int)
Expand Down
Loading

0 comments on commit fa723c3

Please sign in to comment.