Skip to content

Commit

Permalink
planner: reset NotNull flag for schema of Apply and MaxOneRow properly (
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka authored Oct 16, 2019
1 parent 1274082 commit 9539aa5
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cmd/explaintest/r/tpch.result
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ Projection_37 100.00 root tpch.supplier.s_acctbal, tpch.supplier.s_name, tpch.na
│ └─TableReader_74 155496.00 root data:Selection_73
│ └─Selection_73 155496.00 cop eq(tpch.part.p_size, 30), like(tpch.part.p_type, "%STEEL", 92)
│ └─TableScan_72 10000000.00 cop table:part, range:[-inf,+inf], keep order:false
└─Selection_75 6524008.35 root not(isnull(min(ps_supplycost)))
└─Selection_75 6524008.35 root not(isnull(19_col_0))
└─HashAgg_78 8155010.44 root group by:tpch.partsupp.ps_partkey, funcs:min(tpch.partsupp.ps_supplycost), firstrow(tpch.partsupp.ps_partkey)
└─HashRightJoin_82 8155010.44 root inner join, inner:HashRightJoin_84, equal:[eq(tpch.supplier.s_suppkey, tpch.partsupp.ps_suppkey)]
├─HashRightJoin_84 100000.00 root inner join, inner:HashRightJoin_90, equal:[eq(tpch.nation.n_nationkey, tpch.supplier.s_nationkey)]
Expand Down
17 changes: 17 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,20 @@ func (s *testIntegrationSuite) runTestsWithTestData(caseName string, tk *testkit
tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSuite) TestApplyNotNullFlag(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
defer func() {
dom.Close()
store.Close()
}()
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(x int not null)")
tk.MustExec("create table t2(x int)")
tk.MustExec("insert into t2 values (1)")

tk.MustQuery("select IFNULL((select t1.x from t1 where t1.x = t2.x), 'xxx') as col1 from t2").Check(testkit.Rows("xxx"))
}
39 changes: 22 additions & 17 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,14 @@ func (p *LogicalJoin) extractOnCondition(conditions []expression.Expression, der
arg0, lOK := binop.GetArgs()[0].(*expression.Column)
arg1, rOK := binop.GetArgs()[1].(*expression.Column)
if lOK && rOK {
var leftCol, rightCol *expression.Column
if left.Schema().Contains(arg0) && right.Schema().Contains(arg1) {
leftCol, rightCol = arg0, arg1
leftCol := left.Schema().RetrieveColumn(arg0)
rightCol := right.Schema().RetrieveColumn(arg1)
if leftCol == nil || rightCol == nil {
leftCol = left.Schema().RetrieveColumn(arg1)
rightCol = right.Schema().RetrieveColumn(arg0)
arg0, arg1 = arg1, arg0
}
if leftCol == nil && left.Schema().Contains(arg1) && right.Schema().Contains(arg0) {
leftCol, rightCol = arg1, arg0
}
if leftCol != nil {
if leftCol != nil && rightCol != nil {
// Do not derive `is not null` for anti join, since it may cause wrong results.
// For example:
// `select * from t t1 where t1.a not in (select b from t t2)` does not imply `t2.b is not null`,
Expand All @@ -272,16 +272,16 @@ func (p *LogicalJoin) extractOnCondition(conditions []expression.Expression, der
rightCond = append(rightCond, notNullExpr)
}
}
}
// For quries like `select a in (select a from s where s.b = t.b) from t`,
// if subquery is empty caused by `s.b = t.b`, the result should always be
// false even if t.a is null or s.a is null. To make this join "empty aware",
// we should differentiate `t.a = s.a` from other column equal conditions, so
// we put it into OtherConditions instead of EqualConditions of join.
if leftCol != nil && binop.FuncName.L == ast.EQ && !leftCol.InOperand && !rightCol.InOperand {
cond := expression.NewFunctionInternal(ctx, ast.EQ, types.NewFieldType(mysql.TypeTiny), leftCol, rightCol)
eqCond = append(eqCond, cond.(*expression.ScalarFunction))
continue
// For queries like `select a in (select a from s where s.b = t.b) from t`,
// if subquery is empty caused by `s.b = t.b`, the result should always be
// false even if t.a is null or s.a is null. To make this join "empty aware",
// we should differentiate `t.a = s.a` from other column equal conditions, so
// we put it into OtherConditions instead of EqualConditions of join.
if binop.FuncName.L == ast.EQ && !arg0.InOperand && !arg1.InOperand {
cond := expression.NewFunctionInternal(ctx, ast.EQ, types.NewFieldType(mysql.TypeTiny), arg0, arg1)
eqCond = append(eqCond, cond.(*expression.ScalarFunction))
continue
}
}
}
}
Expand Down Expand Up @@ -2519,6 +2519,11 @@ func (b *PlanBuilder) buildApplyWithJoinType(outerPlan, innerPlan LogicalPlan, t
ap := LogicalApply{LogicalJoin: LogicalJoin{JoinType: tp}}.Init(b.ctx)
ap.SetChildren(outerPlan, innerPlan)
ap.SetSchema(expression.MergeSchema(outerPlan.Schema(), innerPlan.Schema()))
// Note that, tp can only be LeftOuterJoin or InnerJoin, so we don't consider other outer joins.
if tp == LeftOuterJoin {
b.optFlag = b.optFlag | flagEliminateOuterJoin
resetNotNullFlag(ap.schema, outerPlan.Schema().Len(), ap.schema.Len())
}
for i := outerPlan.Schema().Len(); i < ap.Schema().Len(); i++ {
ap.schema.Columns[i].IsReferenced = true
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1025,11 +1025,11 @@
},
{
"SQL": "select /*+ HASH_AGG(@sel_1), STREAM_AGG(@sel_2) */ count(*) from t t1 where t1.a < (select count(*) from t t2 where t1.a > t2.a)",
"Plan": "Apply{IndexReader(Index(t.c_d_e)[[NULL,+inf]])->IndexReader(Index(t.c_d_e)[[NULL,+inf]]->Sel([gt(test.t1.a, test.t2.a)])->StreamAgg)->StreamAgg->Sel([not(isnull(count(*)))])}->HashAgg"
"Plan": "Apply{IndexReader(Index(t.c_d_e)[[NULL,+inf]])->IndexReader(Index(t.c_d_e)[[NULL,+inf]]->Sel([gt(test.t1.a, test.t2.a)])->StreamAgg)->StreamAgg->Sel([not(isnull(5_col_0))])}->HashAgg"
},
{
"SQL": "select /*+ STREAM_AGG(@sel_1), HASH_AGG(@qb) */ count(*) from t t1 where t1.a < (select /*+ QB_NAME(qb) */ count(*) from t t2 where t1.a > t2.a)",
"Plan": "Apply{IndexReader(Index(t.c_d_e)[[NULL,+inf]])->IndexReader(Index(t.c_d_e)[[NULL,+inf]]->Sel([gt(test.t1.a, test.t2.a)])->HashAgg)->HashAgg->Sel([not(isnull(count(*)))])}->StreamAgg"
"Plan": "Apply{IndexReader(Index(t.c_d_e)[[NULL,+inf]])->IndexReader(Index(t.c_d_e)[[NULL,+inf]]->Sel([gt(test.t1.a, test.t2.a)])->HashAgg)->HashAgg->Sel([not(isnull(5_col_0))])}->StreamAgg"
},
{
"SQL": "select /*+ HASH_AGG(@sel_2) */ a, (select count(*) from t t1 where t1.b > t.a) from t where b > (select b from t t2 where t2.b = t.a limit 1)",
Expand Down
20 changes: 17 additions & 3 deletions planner/core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,30 @@ func (s *baseSchemaProducer) SetSchema(schema *expression.Schema) {
s.schema = schema
}

// Schema implements the Plan.Schema interface.
func (p *LogicalMaxOneRow) Schema() *expression.Schema {
s := p.Children()[0].Schema().Clone()
resetNotNullFlag(s, 0, s.Len())
return s
}

func buildLogicalJoinSchema(joinType JoinType, join LogicalPlan) *expression.Schema {
leftSchema := join.Children()[0].Schema()
switch joinType {
case SemiJoin, AntiSemiJoin:
return join.Children()[0].Schema().Clone()
return leftSchema.Clone()
case LeftOuterSemiJoin, AntiLeftOuterSemiJoin:
newSchema := join.Children()[0].Schema().Clone()
newSchema := leftSchema.Clone()
newSchema.Append(join.Schema().Columns[join.Schema().Len()-1])
return newSchema
}
return expression.MergeSchema(join.Children()[0].Schema(), join.Children()[1].Schema())
newSchema := expression.MergeSchema(leftSchema, join.Children()[1].Schema())
if joinType == LeftOuterJoin {
resetNotNullFlag(newSchema, leftSchema.Len(), newSchema.Len())
} else if joinType == RightOuterJoin {
resetNotNullFlag(newSchema, 0, leftSchema.Len())
}
return newSchema
}

func buildPhysicalJoinSchema(joinType JoinType, join PhysicalPlan) *expression.Schema {
Expand Down

0 comments on commit 9539aa5

Please sign in to comment.