Skip to content

Commit

Permalink
plan: optimize the filter condition push down logic when do index dou…
Browse files Browse the repository at this point in the history
…ble read (pingcap#2166)

Origin design will put index conditions into table scan, but it can
be pushed down into index scan.
  • Loading branch information
alivxxx authored and coocood committed Dec 7, 2016
1 parent 4f742c2 commit 3793632
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 31 deletions.
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan) Executor {
desc: v.Desc,
limitCount: v.LimitCount,
keepOrder: v.KeepOrder,
where: v.ConditionPBExpr,
where: v.TableConditionPBExpr,
aggregate: v.Aggregated,
aggFuncs: v.AggFuncsPB,
aggFields: v.AggFields,
Expand Down Expand Up @@ -563,7 +563,7 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor {
indexPlan: v,
singleReadMode: !v.DoubleRead,
startTS: startTS,
where: v.ConditionPBExpr,
where: v.TableConditionPBExpr,
aggregate: v.Aggregated,
aggFuncs: v.AggFuncsPB,
aggFields: v.AggFields,
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,10 @@ func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) {
selIdxReq.Limit = e.indexPlan.LimitCount
}
concurrency := e.scanConcurrency
selIdxReq.Where = e.indexPlan.IndexConditionPBExpr
if e.singleReadMode {
selIdxReq.Aggregates = e.aggFuncs
selIdxReq.GroupBy = e.byItems
selIdxReq.Where = e.where
} else if !e.indexPlan.OutOfOrder {
// The cost of index scan double-read is higher than single-read. Usually ordered index scan has a limit
// which may not have been pushed down, so we set concurrency to 1 to avoid fetching unnecessary data.
Expand Down
11 changes: 11 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,17 @@ func (s *testSuite) TestIndexScan(c *C) {
tk.MustExec("INSERT INTO tab1 VALUES(1,37,20.85,30,10.69)")
result = tk.MustQuery("SELECT pk FROM tab1 WHERE ((col3 <= 6 OR col3 < 29 AND (col0 < 41)) OR col3 > 42) AND col1 >= 96.1 AND col3 = 30 AND col3 > 17 AND (col0 BETWEEN 36 AND 42)")
result.Check(testkit.Rows())
tk.MustExec("drop table if exists tab1")
tk.MustExec("CREATE TABLE tab1(pk INTEGER PRIMARY KEY, a INTEGER, b INTEGER)")
tk.MustExec("CREATE INDEX idx_tab1_0 on tab1 (a)")
tk.MustExec("INSERT INTO tab1 VALUES(1,1,1)")
tk.MustExec("INSERT INTO tab1 VALUES(2,2,1)")
tk.MustExec("INSERT INTO tab1 VALUES(3,1,2)")
tk.MustExec("INSERT INTO tab1 VALUES(4,2,2)")
result = tk.MustQuery("SELECT * FROM tab1 WHERE pk <= 3 AND a = 1")
result.Check(testkit.Rows("1 1 1", "3 1 2"))
result = tk.MustQuery("SELECT * FROM tab1 WHERE pk <= 4 AND a = 1 AND b = 2")
result.Check(testkit.Rows("3 1 2"))
}

func (s *testSuite) TestSubquerySameTable(c *C) {
Expand Down
101 changes: 86 additions & 15 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (s *testSuite) TestExplain(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (c1 int primary key, c2 int, index c2 (c2))")
tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index c2 (c2))")
tk.MustExec("create table t2 (c1 int unique, c2 int)")

cases := []struct {
Expand All @@ -53,7 +53,8 @@ func (s *testSuite) TestExplain(c *C) {
"push down info": {
"limit": 0,
"access conditions": null,
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
},
Expand All @@ -74,11 +75,12 @@ func (s *testSuite) TestExplain(c *C) {
"ranges": "[[\u003cnil\u003e,+inf]]",
"desc": false,
"out of order": false,
"double read": false,
"double read": true,
"push down info": {
"limit": 0,
"access conditions": null,
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
},
Expand All @@ -100,7 +102,8 @@ func (s *testSuite) TestExplain(c *C) {
"push down info": {
"limit": 0,
"access conditions": null,
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
Expand Down Expand Up @@ -133,13 +136,14 @@ func (s *testSuite) TestExplain(c *C) {
"access conditions": [
"gt(test.t1.c1, 0)"
],
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
},
},
{
"select * from t1 where t1.c2 = 1",
"select t1.c1, t1.c2 from t1 where t1.c2 = 1",
[]string{
"IndexScan_5",
},
Expand All @@ -159,7 +163,8 @@ func (s *testSuite) TestExplain(c *C) {
"access conditions": [
"eq(test.t1.c2, 1)"
],
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
},
Expand All @@ -183,7 +188,8 @@ func (s *testSuite) TestExplain(c *C) {
"access conditions": [
"gt(test.t1.c1, 1)"
],
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
Expand All @@ -194,7 +200,8 @@ func (s *testSuite) TestExplain(c *C) {
"push down info": {
"limit": 0,
"access conditions": null,
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
Expand Down Expand Up @@ -227,7 +234,8 @@ func (s *testSuite) TestExplain(c *C) {
"access conditions": [
"eq(test.t1.c1, 1)"
],
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
Expand Down Expand Up @@ -258,7 +266,8 @@ func (s *testSuite) TestExplain(c *C) {
"access conditions": [
"eq(test.t1.c2, 1)"
],
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
Expand All @@ -284,7 +293,8 @@ func (s *testSuite) TestExplain(c *C) {
"push down info": {
"limit": 0,
"access conditions": null,
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
Expand All @@ -303,7 +313,8 @@ func (s *testSuite) TestExplain(c *C) {
"firstrow(b.c2)"
],
"access conditions": null,
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
Expand Down Expand Up @@ -354,7 +365,8 @@ func (s *testSuite) TestExplain(c *C) {
"push down info": {
"limit": 0,
"access conditions": null,
"filter conditions": null
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
Expand All @@ -366,6 +378,65 @@ func (s *testSuite) TestExplain(c *C) {
],
"limit": 1,
"child": "TableScan_5"
}`,
},
},
{
"select * from t1 where c1 > 1 and c2 = 1 and c3 < 1",
[]string{
"IndexScan_5",
},
[]string{
"",
},
[]string{
`{
"db": "test",
"table": "t1",
"index": "c2",
"ranges": "[[1,1]]",
"desc": false,
"out of order": true,
"double read": true,
"push down info": {
"limit": 0,
"access conditions": [
"eq(test.t1.c2, 1)"
],
"index filter conditions": [
"gt(test.t1.c1, 1)"
],
"table filter conditions": [
"lt(test.t1.c3, 1)"
]
}
}`,
},
},
{
"select * from t1 where c1 =1 and c2 > 1",
[]string{
"TableScan_4",
},
[]string{
"",
},
[]string{
`{
"db": "test",
"table": "t1",
"desc": false,
"keep order": false,
"push down info": {
"limit": 0,
"access conditions": [
"eq(test.t1.c1, 1)"
],
"index filter conditions": null,
"table filter conditions": [
"gt(test.t1.c2, 1)"
]
}
}`,
},
},
Expand Down
9 changes: 6 additions & 3 deletions plan/physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInf
if client != nil {
memDB := infoschema.IsMemoryDB(p.DBName.L)
if !memDB && client.SupportRequestType(kv.ReqTypeSelect, 0) {
ts.ConditionPBExpr, ts.conditions, newSel.Conditions = expressionsToPB(sc, newSel.Conditions, client)
ts.TableConditionPBExpr, ts.tableFilterConditions, newSel.Conditions = expressionsToPB(sc, newSel.Conditions, client)
}
}
err := buildTableRange(ts)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInf
return nil, errors.Trace(err)
}
}
if ts.ConditionPBExpr != nil {
if ts.TableConditionPBExpr != nil {
rowCount = uint64(float64(rowCount) * selectionFactor)
}
return resultPlan.matchProperty(prop, &physicalPlanInfo{count: rowCount}), nil
Expand Down Expand Up @@ -245,7 +245,10 @@ func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.Inde
if client != nil {
memDB := infoschema.IsMemoryDB(p.DBName.L)
if !memDB && client.SupportRequestType(kv.ReqTypeIndex, 0) {
is.ConditionPBExpr, is.conditions, newSel.Conditions = expressionsToPB(sc, newSel.Conditions, client)
indexConditions, tableConditions := detachIndexFilterConditions(newSel.Conditions, is.Index.Columns, is.Table)
is.IndexConditionPBExpr, is.indexFilterConditions, indexConditions = expressionsToPB(sc, indexConditions, client)
is.TableConditionPBExpr, is.tableFilterConditions, tableConditions = expressionsToPB(sc, tableConditions, client)
newSel.Conditions = append(indexConditions, tableConditions...)
}
}
err := buildIndexRange(p.ctx.GetSessionVars().StmtCtx, is)
Expand Down
79 changes: 78 additions & 1 deletion plan/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ func (s *testPlanSuite) TestPushDownExpression(c *C) {
ts = &x.physicalTableSource
}
if ts != nil {
c.Assert(fmt.Sprintf("%s", expression.ComposeCNFCondition(ts.conditions).String()), Equals, ca.cond, Commentf("for %s", sql))
conditions := append(ts.indexFilterConditions, ts.tableFilterConditions...)
c.Assert(fmt.Sprintf("%s", expression.ComposeCNFCondition(conditions).String()), Equals, ca.cond, Commentf("for %s", sql))
break
}
p = p.GetChildByIndex(0)
Expand Down Expand Up @@ -644,3 +645,79 @@ func (s *testPlanSuite) TestCoveringIndex(c *C) {
c.Assert(covering, Equals, ca.isCovering)
}
}

func (s *testPlanSuite) TestFilterConditionPushDown(c *C) {
defer testleak.AfterTest(c)()
cases := []struct {
sql string
access string
indexFilter string
tableFilter string
}{
{
sql: "select * from t",
access: "[]",
indexFilter: "[]",
tableFilter: "[]",
},
{
sql: "select * from t where t.c < 10000 and t.d = 1 and t.g > 1",
access: "[lt(test.t.c, 10000)]",
indexFilter: "[eq(test.t.d, 1)]",
tableFilter: "[gt(test.t.g, 1)]",
},
{
sql: "select * from t where t.a < 1 and t.c < t.d",
access: "[lt(test.t.a, 1)]",
indexFilter: "[]",
tableFilter: "[lt(test.t.c, test.t.d)]",
},
{
sql: "select * from t use index(c_d_e) where t.a < 1 and t.c =1 and t.d < t.e and t.b > (t.a - t.d)",
access: "[eq(test.t.c, 1)]",
indexFilter: "[lt(test.t.a, 1) lt(test.t.d, test.t.e)]",
tableFilter: "[gt(test.t.b, minus(test.t.a, test.t.d))]",
},
}
for _, ca := range cases {
comment := Commentf("for %s", ca.sql)
stmt, err := s.ParseOneStmt(ca.sql, "", "")
c.Assert(err, IsNil, comment)
ast.SetFlag(stmt)

err = mockResolve(stmt)
c.Assert(err, IsNil)
builder := &planBuilder{
allocator: new(idAllocator),
ctx: mockContext(),
colMapper: make(map[*ast.ColumnNameExpr]int),
}
p := builder.build(stmt)
c.Assert(builder.err, IsNil)
lp := p.(LogicalPlan)

_, lp, err = lp.PredicatePushDown(nil)
c.Assert(err, IsNil)
lp.PruneColumns(lp.GetSchema())
lp.ResolveIndicesAndCorCols()
info, err := lp.convert2PhysicalPlan(&requiredProperty{})
c.Assert(err, IsNil)
p = info.p
for {
var ts *physicalTableSource
switch x := p.(type) {
case *PhysicalTableScan:
ts = &x.physicalTableSource
case *PhysicalIndexScan:
ts = &x.physicalTableSource
}
if ts != nil {
c.Assert(fmt.Sprintf("%s", ts.AccessCondition), Equals, ca.access, Commentf("for %s", ca.sql))
c.Assert(fmt.Sprintf("%s", ts.indexFilterConditions), Equals, ca.indexFilter, Commentf("for %s", ca.sql))
c.Assert(fmt.Sprintf("%s", ts.tableFilterConditions), Equals, ca.tableFilter, Commentf("for %s", ca.sql))
break
}
p = p.GetChildByIndex(0)
}
}
}
Loading

0 comments on commit 3793632

Please sign in to comment.