Skip to content

Commit

Permalink
planner: eliminate useless scalar subqueries in some scenarios of agg…
Browse files Browse the repository at this point in the history
…regate queries (#47550)

close #45822
  • Loading branch information
King-Dylan authored Nov 7, 2023
1 parent 8a8e0c0 commit 286e852
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 70 deletions.
4 changes: 2 additions & 2 deletions pkg/executor/test/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2352,7 +2352,7 @@ func TestApplyCache(t *testing.T) {
tk.MustExec("create table t(a int);")
tk.MustExec("insert into t values (1),(1),(1),(1),(1),(1),(1),(1),(1);")
tk.MustExec("analyze table t;")
result := tk.MustQuery("explain analyze SELECT count(1) FROM (SELECT (SELECT min(a) FROM t as t2 WHERE t2.a > t1.a) AS a from t as t1) t;")
result := tk.MustQuery("explain analyze SELECT count(a) FROM (SELECT (SELECT min(a) FROM t as t2 WHERE t2.a > t1.a) AS a from t as t1) t;")
require.Contains(t, result.Rows()[1][0], "Apply")
var (
ind int
Expand All @@ -2372,7 +2372,7 @@ func TestApplyCache(t *testing.T) {
tk.MustExec("create table t(a int);")
tk.MustExec("insert into t values (1),(2),(3),(4),(5),(6),(7),(8),(9);")
tk.MustExec("analyze table t;")
result = tk.MustQuery("explain analyze SELECT count(1) FROM (SELECT (SELECT min(a) FROM t as t2 WHERE t2.a > t1.a) AS a from t as t1) t;")
result = tk.MustQuery("explain analyze SELECT count(a) FROM (SELECT (SELECT min(a) FROM t as t2 WHERE t2.a > t1.a) AS a from t as t1) t;")
require.Contains(t, result.Rows()[1][0], "Apply")
flag = false
value = (result.Rows()[1][5]).(string)
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (opt *Optimizer) FindBestPlan(sctx sessionctx.Context, logical plannercore.
}

func (*Optimizer) onPhasePreprocessing(_ sessionctx.Context, plan plannercore.LogicalPlan) (plannercore.LogicalPlan, error) {
err := plan.PruneColumns(plan.Schema().Columns, nil)
err := plan.PruneColumns(plan.Schema().Columns, nil, plan)
if err != nil {
return nil, err
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/planner/core/casetest/physicalplantest/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,35 @@ func TestAggEliminator(t *testing.T) {
}
}

// Fix Issue #45822
func TestRuleColumnPruningLogicalApply(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
var input []string
var output []struct {
SQL string
Best string
}
planSuiteData := GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
p := parser.New()
is := infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable(), core.MockUnsignedTable()})
tk.MustExec("use test")
tk.MustExec("set @@tidb_opt_fix_control = '45822:ON';")
for i, tt := range input {
comment := fmt.Sprintf("input: %s", tt)
stmt, err := p.ParseOneStmt(tt, "", "")
require.NoError(t, err, comment)
p, _, err := planner.Optimize(context.TODO(), tk.Session(), stmt, is)
require.NoError(t, err)
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Best = core.ToString(p)
})
require.Equal(t, output[i].Best, core.ToString(p), fmt.Sprintf("input: %s", tt))
}
}

func TestINMJHint(t *testing.T) {
var (
input []string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,21 @@
"select max(a) from (select t1.a from t t1 join t t2 on t1.a=t2.a) t"
]
},
{
"name": "TestRuleColumnPruningLogicalApply",
"cases": [
"SELECT COUNT(*) FROM (SELECT a, (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t1.a = t2.a LIMIT 1) t FROM t t1) t",
"SELECT COUNT(a) FROM (SELECT a, (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t1.a = t2.a LIMIT 1) t FROM t t1) t",
"SELECT COUNT(t) FROM (SELECT a, (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t1.a = t2.a LIMIT 1) t FROM t t1) t",
"SELECT COUNT(a) FROM t t1 WHERE t1.a IN (SELECT t2.a FROM t t2, t t3 WHERE t2.b = t3.b)",
"SELECT a FROM (SELECT a, (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t1.a = t2.a LIMIT 1) t FROM t t1) t",
"SELECT a FROM t WHERE b IN (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a)",
"SELECT a FROM t WHERE EXISTS (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t2.b=t.b)",
"SELECT a FROM t WHERE NOT EXISTS (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t2.b=t.b)",
"SELECT a FROM t WHERE b IN (SELECT b FROM t WHERE b = 1 AND a IN (SELECT a FROM t WHERE a > 0))",
"SELECT a FROM t WHERE b IN (SELECT b FROM t WHERE b = 1 AND a IN (SELECT t2.a FROM (SELECT t1.a, (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t1.a = t2.a LIMIT 1) t FROM t t1) t2))"
]
},
{
"name": "TestUnmatchedTableInHint",
"cases": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2320,6 +2320,51 @@
}
]
},
{
"Name": "TestRuleColumnPruningLogicalApply",
"Cases": [
{
"SQL": "SELECT COUNT(*) FROM (SELECT a, (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t1.a = t2.a LIMIT 1) t FROM t t1) t",
"Best": "IndexReader(Index(t.f)[[NULL,+inf]]->HashAgg)->HashAgg"
},
{
"SQL": "SELECT COUNT(a) FROM (SELECT a, (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t1.a = t2.a LIMIT 1) t FROM t t1) t",
"Best": "IndexReader(Index(t.f)[[NULL,+inf]]->HashAgg)->HashAgg"
},
{
"SQL": "SELECT COUNT(t) FROM (SELECT a, (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t1.a = t2.a LIMIT 1) t FROM t t1) t",
"Best": "Apply{IndexReader(Index(t.f)[[NULL,+inf]])->MergeInnerJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t.a,test.t.a)->Limit}->HashAgg"
},
{
"SQL": "SELECT COUNT(a) FROM t t1 WHERE t1.a IN (SELECT t2.a FROM t t2, t t3 WHERE t2.b = t3.b)",
"Best": "LeftHashJoin{IndexReader(Index(t.f)[[NULL,+inf]])->LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t.b,test.t.b)->HashAgg}(test.t.a,test.t.a)->HashAgg"
},
{
"SQL": "SELECT a FROM (SELECT a, (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t1.a = t2.a LIMIT 1) t FROM t t1) t",
"Best": "IndexReader(Index(t.f)[[NULL,+inf]])"
},
{
"SQL": "SELECT a FROM t WHERE b IN (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a)",
"Best": "LeftHashJoin{TableReader(Table(t))->MergeInnerJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t.a,test.t.a)->HashAgg}(test.t.b,test.t.b)"
},
{
"SQL": "SELECT a FROM t WHERE EXISTS (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t2.b=t.b)",
"Best": "LeftHashJoin{TableReader(Table(t))->MergeInnerJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t.a,test.t.a)}(test.t.b,test.t.b)"
},
{
"SQL": "SELECT a FROM t WHERE NOT EXISTS (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t2.b=t.b)",
"Best": "LeftHashJoin{TableReader(Table(t))->MergeInnerJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t.a,test.t.a)}(test.t.b,test.t.b)"
},
{
"SQL": "SELECT a FROM t WHERE b IN (SELECT b FROM t WHERE b = 1 AND a IN (SELECT a FROM t WHERE a > 0))",
"Best": "RightHashJoin{IndexJoin{TableReader(Table(t)->Sel([eq(test.t.b, 1)]))->TableReader(Table(t)->Sel([gt(test.t.a, 0)]))}(test.t.a,test.t.a)->HashAgg->TableReader(Table(t))}(test.t.b,test.t.b)"
},
{
"SQL": "SELECT a FROM t WHERE b IN (SELECT b FROM t WHERE b = 1 AND a IN (SELECT t2.a FROM (SELECT t1.a, (SELECT t2.b FROM t t2, t t3 WHERE t2.a = t3.a AND t1.a = t2.a LIMIT 1) t FROM t t1) t2))",
"Best": "LeftHashJoin{TableReader(Table(t))->IndexJoin{TableReader(Table(t)->Sel([eq(test.t.b, 1)]))->TableReader(Table(t))}(test.t.a,test.t.a)->HashAgg}(test.t.b,test.t.b)"
}
]
},
{
"Name": "TestUnmatchedTableInHint",
"Cases": [
Expand Down
25 changes: 9 additions & 16 deletions pkg/planner/core/casetest/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -356,22 +356,15 @@
{
"SQL": "explain format = 'verbose' select (2) in (select /*+ read_from_storage(tiflash[t1]) */ count(*) from t1) from (select t.b < (select /*+ read_from_storage(tiflash[t2]) */ t.b from t2 limit 1 ) from t3 t) t; -- we do generate the agg pushed-down plan of mpp, but cost-cmp failed",
"Plan": [
"HashJoin_19 3.00 162366.01 root CARTESIAN left outer semi join",
"├─Selection_36(Build) 0.80 31149.25 root eq(2, Column#18)",
"│ └─StreamAgg_43 1.00 31099.35 root funcs:count(1)->Column#18",
"│ └─TableReader_55 3.00 30949.65 root MppVersion: 2, data:ExchangeSender_54",
"│ └─ExchangeSender_54 3.00 464139.20 mpp[tiflash] ExchangeType: PassThrough",
"│ └─TableFullScan_53 3.00 464139.20 mpp[tiflash] table:t1 keep order:false",
"└─Projection_20(Probe) 3.00 129648.62 root 1->Column#28",
" └─Apply_22 3.00 129648.32 root CARTESIAN left outer join",
" ├─IndexReader_26(Build) 3.00 53.37 root index:IndexFullScan_25",
" │ └─IndexFullScan_25 3.00 610.50 cop[tikv] table:t, index:c(b) keep order:false",
" └─Projection_27(Probe) 3.00 43198.32 root 1->Column#26",
" └─Limit_30 3.00 43198.22 root offset:0, count:1",
" └─TableReader_35 3.00 43198.22 root MppVersion: 2, data:ExchangeSender_34",
" └─ExchangeSender_34 3.00 647920.44 mpp[tiflash] ExchangeType: PassThrough",
" └─Limit_33 3.00 647920.44 mpp[tiflash] offset:0, count:1",
" └─TableFullScan_32 3.00 647920.44 mpp[tiflash] table:t2 keep order:false"
"HashJoin_17 3.00 32771.06 root CARTESIAN left outer semi join",
"├─Selection_23(Build) 0.80 31149.25 root eq(2, Column#18)",
"│ └─StreamAgg_30 1.00 31099.35 root funcs:count(1)->Column#18",
"│ └─TableReader_42 3.00 30949.65 root MppVersion: 2, data:ExchangeSender_41",
"│ └─ExchangeSender_41 3.00 464139.20 mpp[tiflash] ExchangeType: PassThrough",
"│ └─TableFullScan_40 3.00 464139.20 mpp[tiflash] table:t1 keep order:false",
"└─Projection_18(Probe) 3.00 53.67 root 1->Column#24",
" └─IndexReader_22 3.00 53.37 root index:IndexFullScan_21",
" └─IndexFullScan_21 3.00 610.50 cop[tikv] table:t, index:c(b) keep order:false"
]
},
{
Expand Down
6 changes: 3 additions & 3 deletions pkg/planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ type LogicalPlan interface {
PredicatePushDown([]expression.Expression, *logicalOptimizeOp) ([]expression.Expression, LogicalPlan)

// PruneColumns prunes the unused columns.
PruneColumns([]*expression.Column, *logicalOptimizeOp) error
PruneColumns([]*expression.Column, *logicalOptimizeOp, LogicalPlan) error

// findBestTask converts the logical plan to the physical plan. It's a new interface.
// It is called recursively from the parent to the children to create the result physical plan.
Expand Down Expand Up @@ -756,11 +756,11 @@ func (*baseLogicalPlan) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
}

// PruneColumns implements LogicalPlan interface.
func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column, opt *logicalOptimizeOp) error {
func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column, opt *logicalOptimizeOp, _ LogicalPlan) error {
if len(p.children) == 0 {
return nil
}
return p.children[0].PruneColumns(parentUsedCols, opt)
return p.children[0].PruneColumns(parentUsedCols, opt, p)
}

// Schema implements Plan Schema interface.
Expand Down
Loading

0 comments on commit 286e852

Please sign in to comment.