[SPARK-31809][SQL] Infer IsNotNull from some special equality join keys#28642
[SPARK-31809][SQL] Infer IsNotNull from some special equality join keys#28642wangyum wants to merge 4 commits intoapache:masterfrom wangyum:SPARK-31809
Conversation
|
Test build #123119 has finished for PR 28642 at commit
|
| testConstraintsAfterJoin( | ||
| testRelation.subquery('left), | ||
| testRelation.subquery('right), | ||
| testRelation.where(IsNotNull(Coalesce(Seq('a, 'b)))).subquery('left), |
There was a problem hiding this comment.
hive> EXPLAIN SELECT t1.* FROM t1 JOIN t2 ON coalesce(t1.a, t1.b)=t2.a;
OK
STAGE DEPENDENCIES:
Stage-4 is a root stage
Stage-3 depends on stages: Stage-4
Stage-0 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-4
Map Reduce Local Work
Alias -> Map Local Tables:
$hdt$_0:t1
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
$hdt$_0:t1
TableScan
alias: t1
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Filter Operator
predicate: COALESCE(a,b) is not null (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: a (type: string), b (type: string), c (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
HashTable Sink Operator
keys:
0 COALESCE(_col0,_col1) (type: string)
1 _col0 (type: string)
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
alias: t2
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Filter Operator
predicate: a is not null (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: a (type: string)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 COALESCE(_col0,_col1) (type: string)
1 _col0 (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Execution mode: vectorized
Local Work:
Map Reduce Local Work
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
|
|
||
| test("Should not infer IsNotNull for non null-intolerant child from same table") { | ||
| comparePlans(Optimize.execute(testRelation.where(Coalesce(Seq('a, 'b)) === 'c).analyze), | ||
| testRelation.where(Coalesce(Seq('a, 'b)) === 'c && IsNotNull('c)).analyze) |
There was a problem hiding this comment.
hive> EXPLAIN SELECT t1.* FROM t1 WHERE coalesce(t1.a, t1.b)=t1.c;
OK
STAGE DEPENDENCIES:
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
TableScan
alias: t1
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Filter Operator
predicate: (COALESCE(a,b) = c) (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: a (type: string), b (type: string), c (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
ListSink
Time taken: 4.026 seconds, Fetched: 20 row(s)
|
retest this please |
|
Test build #123553 has finished for PR 28642 at commit
|
|
Test build #123607 has finished for PR 28642 at commit
|
| case p: BatchEvalPythonExec => p | ||
| } | ||
| assert(pythonEvals.size == 2) | ||
| assert(pythonEvals.size == 4) |
There was a problem hiding this comment.
@HyukjinKwon I'm not sure if this change can optimize python udf?
There was a problem hiding this comment.
Yeah, I don't think it's more efficient to have BatchEvalPythonExec more. It will require more Python executions which aren't trivial.
There was a problem hiding this comment.
I quickly checked:
== Physical Plan ==
*(3) Project [a#225, b#226, c#236, d#237]
+- *(3) BroadcastHashJoin [cast(pythonUDF0#256 as int)], [cast(pythonUDF0#257 as int)], Inner, BuildRight
:- BatchEvalPython [udf(cast(a#225 as string))], [pythonUDF0#256]
: +- *(1) Project [_1#220 AS a#225, _2#221 AS b#226]
: +- *(1) Project [_1#220, _2#221]
: +- *(1) Filter isnotnull(cast(pythonUDF0#254 as int))
: +- BatchEvalPython [udf(cast(_1#220 as string))], [pythonUDF0#254]
: +- LocalTableScan [_1#220, _2#221]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[2, string, true] as int) as bigint))), [id=#140]
+- BatchEvalPython [udf(cast(c#236 as string))], [pythonUDF0#257]
+- *(2) Project [_1#231 AS c#236, _2#232 AS d#237]
+- *(2) Project [_1#231, _2#232]
+- *(2) Filter isnotnull(cast(pythonUDF0#255 as int))
+- BatchEvalPython [udf(cast(_1#231 as string))], [pythonUDF0#255]
+- LocalTableScan [_1#231, _2#232]
We should probably avoid inferring the is-not-null filter in this case.
|
retest this please |
|
Test build #124032 has finished for PR 28642 at commit
|
|
retest this please |
|
cc @cloud-fan FYI |
|
Test build #124038 has finished for PR 28642 at commit
|
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
Test build #142271 has finished for PR 28642 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Test build #142276 has finished for PR 28642 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #142299 has finished for PR 28642 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
| private def resultMayBeNull(e: Expression): Boolean = e match { | ||
| case Cast(child, dataType, _, _) => !Cast.canUpCast(child.dataType, dataType) | ||
| case _: Coalesce => true | ||
| case _ => false | ||
| } |
There was a problem hiding this comment.
@cloud-fan @HyukjinKwon It will not infer all equality join keys. For example:
| Infer | Will not infer |
|---|---|
| cast(strCol AS double) = doubleCol | upper(strCol) = upperStrCol |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144659 has finished for PR 28642 at commit
|
|
Test build #144661 has finished for PR 28642 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144680 has finished for PR 28642 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Show resolved
Hide resolved
|
Test build #144703 has finished for PR 28642 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
| private def resultMayBeNull(exp: Expression): Boolean = exp match { | ||
| case e if !e.nullable => false | ||
| case Cast(child: Attribute, dataType, _, _) => !Cast.canUpCast(child.dataType, dataType) | ||
| case c: Coalesce if c.children.forall(_.isInstanceOf[Attribute]) => true |
There was a problem hiding this comment.
Can't we rely on the NullIntolerant interface?
There was a problem hiding this comment.
We can infer NullIntolerant already. For example:
spark.sql("create table t1 (id string, value int) using parquet")
spark.sql("create table t2 (id int, value int) using parquet")
spark.sql("select * from t1 join t2 on t1.id = t2.id").explain("extended")
== Optimized Logical Plan ==
Join Inner, (cast(id#0 as int) = id#2)
:- Filter isnotnull(id#0)
: +- Relation default.t1[id#0,value#1] parquet
+- Filter isnotnull(id#2)
+- Relation default.t2[id#2,value#3] parquet
Cast is NullIntolerant. We can infer IsNotNull(t1.id) already. But I also want to Infer isnotnull(cast(t1.id as int)) because t1.id may contains many strings that can not be casted to int.
| } | ||
|
|
||
| // Whether the result of this expression may be null. For example: CAST(strCol AS double) | ||
| // We will infer an IsNotNull expression for this expression to avoid skew join. |
There was a problem hiding this comment.
is it better to infer IsNotNull(col) instead of IsNotNull(CAST(col AS other_type))?
There was a problem hiding this comment.
We can infer IsNotNull(col) already. For example:
spark.sql("create table t1 (id string, value int) using parquet")
spark.sql("create table t2 (id int, value int) using parquet")
spark.sql("select * from t1 join t2 on t1.id = t2.id").explain("extended")Before this pr:
== Optimized Logical Plan ==
Join Inner, (cast(id#0 as int) = id#2)
:- Filter isnotnull(id#0)
: +- Relation default.t1[id#0,value#1] parquet
+- Filter isnotnull(id#2)
+- Relation default.t2[id#2,value#3] parquet
After this pr:
== Optimized Logical Plan ==
Join Inner, (cast(id#0 as int) = id#2)
:- Filter (isnotnull(id#0) AND isnotnull(cast(id#0 as int)))
: +- Relation default.t1[id#0,value#1] parquet
+- Filter isnotnull(id#2)
+- Relation default.t2[id#2,value#3] parquet
Infer isnotnull(cast(t1.id as int)) may filter out many strings that can not be casted to int.
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
We can infer
IsNotNullfrom some special equality join keys. For example:The
coalesce(t1.a, t1.b)orCAST(t1.a AS DOUBLE)may generate a lot of null values, which will lead to skew join.After this pr:
Why are the changes needed?
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and benchmark test:
Case1:
Case2: