Skip to content

Commit

Permalink
plan, executor: only set DefaultValues in agg push down. (#5383)
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored and coocood committed Dec 12, 2017
1 parent 2edc8dc commit 581002b
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 71 deletions.
46 changes: 35 additions & 11 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,13 @@ func (b *executorBuilder) buildMergeJoin(v *plan.PhysicalMergeJoin) Executor {
joinKeys: rightKeys,
}

defaultValues := v.DefaultValues
if defaultValues == nil {
defaultValues = make([]types.Datum, rightExec.Schema().Len())
}
e := &MergeJoinExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, leftExec, rightExec),
resultGenerator: newJoinResultGenerator(b.ctx, v.JoinType, false, v.DefaultValues, v.OtherConditions, nil, nil),
resultGenerator: newJoinResultGenerator(b.ctx, v.JoinType, false, defaultValues, v.OtherConditions, nil, nil),
stmtCtx: b.ctx.GetSessionVars().StmtCtx,
// left is the outer side by default.
outerKeys: leftKeys,
Expand Down Expand Up @@ -571,27 +575,36 @@ func (b *executorBuilder) buildHashJoin(v *plan.PhysicalHashJoin) Executor {

// for hash join, inner table is always the smaller one.
e := &HashJoinExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, leftExec, rightExec),
resultGenerator: newJoinResultGenerator(b.ctx, v.JoinType, v.SmallChildIdx == 0, v.DefaultValues, v.OtherConditions, nil, nil),
concurrency: v.Concurrency,
defaultInners: v.DefaultValues,
joinType: v.JoinType,
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, leftExec, rightExec),
concurrency: v.Concurrency,
joinType: v.JoinType,
}

defaultValues := v.DefaultValues
if v.SmallChildIdx == 0 {
e.innerlExec = leftExec
e.innerExec = leftExec
e.outerExec = rightExec
e.innerFilter = v.LeftConditions
e.outerFilter = v.RightConditions
e.innerKeys = leftHashKey
e.outerKeys = rightHashKey
if defaultValues == nil {
defaultValues = make([]types.Datum, e.innerExec.Schema().Len())
}
e.resultGenerator = newJoinResultGenerator(b.ctx, v.JoinType, v.SmallChildIdx == 0, defaultValues,
v.OtherConditions, nil, nil)
} else {
e.innerlExec = rightExec
e.innerExec = rightExec
e.outerExec = leftExec
e.innerFilter = v.RightConditions
e.outerFilter = v.LeftConditions
e.innerKeys = rightHashKey
e.outerKeys = leftHashKey
if defaultValues == nil {
defaultValues = make([]types.Datum, e.innerExec.Schema().Len())
}
e.resultGenerator = newJoinResultGenerator(b.ctx, v.JoinType, v.SmallChildIdx == 0,
defaultValues, v.OtherConditions, nil, nil)
}

return e
Expand Down Expand Up @@ -750,7 +763,11 @@ func (b *executorBuilder) buildNestedLoopJoin(v *plan.PhysicalHashJoin) *NestedL
cond.GetArgs()[0].(*expression.Column).ResolveIndices(v.Schema())
cond.GetArgs()[1].(*expression.Column).ResolveIndices(v.Schema())
}
defaultValues := v.DefaultValues
if v.SmallChildIdx == 1 {
if defaultValues == nil {
defaultValues = make([]types.Datum, v.Children()[1].Schema().Len())
}
return &NestedLoopJoinExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx),
SmallExec: b.build(v.Children()[1]),
Expand All @@ -759,9 +776,12 @@ func (b *executorBuilder) buildNestedLoopJoin(v *plan.PhysicalHashJoin) *NestedL
SmallFilter: v.RightConditions,
OtherFilter: append(expression.ScalarFuncs2Exprs(v.EqualConditions), v.OtherConditions...),
outer: v.JoinType != plan.InnerJoin,
defaultValues: v.DefaultValues,
defaultValues: defaultValues,
}
}
if defaultValues == nil {
defaultValues = make([]types.Datum, v.Children()[0].Schema().Len())
}
return &NestedLoopJoinExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx),
SmallExec: b.build(v.Children()[0]),
Expand All @@ -771,7 +791,7 @@ func (b *executorBuilder) buildNestedLoopJoin(v *plan.PhysicalHashJoin) *NestedL
SmallFilter: v.LeftConditions,
OtherFilter: append(expression.ScalarFuncs2Exprs(v.EqualConditions), v.OtherConditions...),
outer: v.JoinType != plan.InnerJoin,
defaultValues: v.DefaultValues,
defaultValues: defaultValues,
}
}

Expand Down Expand Up @@ -1021,6 +1041,10 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plan.PhysicalIndexJoin) Execut
return nil
}
innerExecBuilder := &dataReaderBuilder{v.Children()[1-v.OuterIndex], b}
defaultValues := v.DefaultValues
if defaultValues == nil {
defaultValues = make([]types.Datum, innerExecBuilder.Schema().Len())
}
return &IndexLookUpJoin{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, outerExec),
outerExec: outerExec,
Expand All @@ -1029,7 +1053,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plan.PhysicalIndexJoin) Execut
innerKeys: v.InnerJoinKeys,
outerFilter: v.LeftConditions,
innerFilter: v.RightConditions,
resultGenerator: newJoinResultGenerator(b.ctx, v.JoinType, v.OuterIndex == 1, v.DefaultValues, v.OtherConditions, nil, nil),
resultGenerator: newJoinResultGenerator(b.ctx, v.JoinType, v.OuterIndex == 1, defaultValues, v.OtherConditions, nil, nil),
maxBatchSize: batchSize,
}
}
Expand Down
9 changes: 4 additions & 5 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type HashJoinExec struct {
baseExecutor

outerExec Executor
innerlExec Executor
innerExec Executor
outerFilter expression.CNFExprs
innerFilter expression.CNFExprs
outerKeys []*expression.Column
Expand All @@ -55,7 +55,6 @@ type HashJoinExec struct {
workerWaitGroup sync.WaitGroup // workerWaitGroup is for sync multiple join workers.
finished atomic.Value
closeCh chan struct{} // closeCh add a lock for closing executor.
defaultInners []types.Datum
joinType plan.JoinType

resultGenerator joinResultGenerator
Expand Down Expand Up @@ -133,12 +132,12 @@ func (e *HashJoinExec) encodeRow(b []byte, row Row) ([]byte, error) {
}

func (e *HashJoinExec) decodeRow(data []byte) (Row, error) {
values := make([]types.Datum, e.innerlExec.Schema().Len())
values := make([]types.Datum, e.innerExec.Schema().Len())
err := codec.SetRawValues(data, values)
if err != nil {
return nil, errors.Trace(err)
}
err = decodeRawValues(values, e.innerlExec.Schema(), e.ctx.GetSessionVars().GetTimeZone())
err = decodeRawValues(values, e.innerExec.Schema(), e.ctx.GetSessionVars().GetTimeZone())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -216,7 +215,7 @@ func (e *HashJoinExec) prepare(goCtx goctx.Context) error {
e.hashTable = mvmap.NewMVMap()
var buffer []byte
for {
innerRow, err := e.innerlExec.Next(goCtx)
innerRow, err := e.innerExec.Next(goCtx)
if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 10 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,3 +735,13 @@ func (s *testSuite) TestIssue5255(c *C) {
tk.MustExec("insert into t2 values(1)")
tk.MustQuery("select /*+ TIDB_INLJ(t2) */ * from t1 join t2 on t1.a=t2.a").Check(testkit.Rows("1 2017-11-29 2.2 1"))
}

func (s *testSuite) TestIssue5278(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t, tt")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("create table tt(a varchar(10), b int)")
tk.MustExec("insert into t values(1, 1)")
tk.MustQuery("select * from t left join tt on t.a=tt.a left join t ttt on t.a=ttt.a").Check(testkit.Rows("1 1 <nil> <nil> 1 1"))
}
7 changes: 0 additions & 7 deletions plan/column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/types"
)

type columnPruner struct {
Expand Down Expand Up @@ -245,12 +244,6 @@ func (p *LogicalJoin) PruneColumns(parentUsedCols []*expression.Column) {
rChild := p.children[1].(LogicalPlan)
lChild.PruneColumns(leftCols)
rChild.PruneColumns(rightCols)
// After column pruning, the size of schema may change, so we should also change the len of default value.
if p.JoinType == LeftOuterJoin {
p.DefaultValues = make([]types.Datum, p.children[1].Schema().Len())
} else if p.JoinType == RightOuterJoin {
p.DefaultValues = make([]types.Datum, p.children[0].Schema().Len())
}
p.mergeSchema()
}

Expand Down
41 changes: 0 additions & 41 deletions plan/eliminate_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package plan
import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/types"
)

// canProjectionBeEliminatedLoose checks whether a projection can be eliminated, returns true if
Expand Down Expand Up @@ -158,9 +157,6 @@ func (pe *projectionEliminater) eliminate(p LogicalPlan, replace map[string]*exp
if !(isProj && canEliminate && canProjectionBeEliminatedLoose(proj)) {
return p
}
if join, ok := p.Parents()[0].(*LogicalJoin); ok {
pe.resetDefaultValues(join, p)
}
exprs := proj.Exprs
for i, col := range proj.Schema().Columns {
replace[string(col.HashCode())] = exprs[i].(*expression.Column)
Expand All @@ -169,43 +165,6 @@ func (pe *projectionEliminater) eliminate(p LogicalPlan, replace map[string]*exp
return p.Children()[0].(LogicalPlan)
}

// If the inner child of a Join is a Projection which been eliminated,
// and the schema of child plan of Projection is not consistent with
// the schema of Projection, the default values of Join should be reset.
func (pe *projectionEliminater) resetDefaultValues(join *LogicalJoin, prj Plan) {
prjChild := prj.Children()[0]
var joinInnerChild Plan
switch join.JoinType {
case LeftOuterJoin:
joinInnerChild = join.Children()[1]
case RightOuterJoin:
joinInnerChild = join.Children()[0]
default:
return
}
if joinInnerChild != prj {
return
}
var schemaIdxMap map[int]int
prjSchema := prj.Schema().Columns
childOfPrjSchema := prjChild.Schema().Columns
for i := 0; i < len(prjSchema); i++ {
for j := 0; j < len(childOfPrjSchema); j++ {
if prjSchema[i].Equal(childOfPrjSchema[j], nil) {
schemaIdxMap[i] = j
}
}
}
newDefaultValues := make([]types.Datum, len(childOfPrjSchema))
for i := range prjSchema {
if j, ok := schemaIdxMap[i]; ok {
newDefaultValues[j] = join.DefaultValues[i]
}
}
join.DefaultValues = newDefaultValues
return
}

func (p *LogicalJoin) replaceExprColumns(replace map[string]*expression.Column) {
for _, equalExpr := range p.EqualConditions {
resolveExprAndReplace(equalExpr, replace)
Expand Down
5 changes: 0 additions & 5 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,8 @@ func (b *planBuilder) buildJoin(join *ast.Join) LogicalPlan {
}
if join.Tp == ast.LeftJoin {
joinPlan.JoinType = LeftOuterJoin
joinPlan.DefaultValues = make([]types.Datum, rightPlan.Schema().Len())
} else if join.Tp == ast.RightJoin {
joinPlan.JoinType = RightOuterJoin
joinPlan.DefaultValues = make([]types.Datum, leftPlan.Schema().Len())
} else {
joinPlan.JoinType = InnerJoin
}
Expand Down Expand Up @@ -1717,9 +1715,6 @@ func (b *planBuilder) buildApplyWithJoinType(outerPlan, innerPlan LogicalPlan, t
b.optFlag = b.optFlag | flagBuildKeyInfo
b.optFlag = b.optFlag | flagDecorrelate
ap := LogicalApply{LogicalJoin: LogicalJoin{JoinType: tp}}.init(b.ctx)
if tp == LeftOuterJoin {
ap.DefaultValues = make([]types.Datum, innerPlan.Schema().Len())
}
setParentAndChildren(ap, outerPlan, innerPlan)
ap.SetSchema(expression.MergeSchema(outerPlan.Schema(), innerPlan.Schema()))
for i := outerPlan.Schema().Len(); i < ap.Schema().Len(); i++ {
Expand Down
6 changes: 4 additions & 2 deletions plan/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ type LogicalJoin struct {
leftProperties [][]*expression.Column
rightProperties [][]*expression.Column

// DefaultValues is only used for outer join, which stands for the default values when the outer table cannot find join partner
// instead of null padding.
// DefaultValues is only used for left/right outer join, which is values the inner row's should be when the outer table
// doesn't match any inner table's row.
// That it's nil just means the default values is a slice of NULL.
// Currently, only `aggregation push down` phase will set this.
DefaultValues []types.Datum

// redundantSchema contains columns which are eliminated in join.
Expand Down

0 comments on commit 581002b

Please sign in to comment.