[SPARK-29231][SQL] Constraints should be inferred from cast equality constraint#27252
[SPARK-29231][SQL] Constraints should be inferred from cast equality constraint#27252wangyum wants to merge 10 commits intoapache:masterfrom wangyum:SPARK-29231
Conversation
|
PostgreSQL and Hive support this feature: postgres=# EXPLAIN select t1.* from spark_29231_1 t1 join spark_29231_2 t2 on (t1.c1 = t2.c1 and t1.c1 = 1);
QUERY PLAN
------------------------------------------------------------------------------
Nested Loop (cost=0.00..69.77 rows=90 width=16)
-> Seq Scan on spark_29231_2 t2 (cost=0.00..35.50 rows=10 width=4)
Filter: (c1 = 1)
-> Materialize (cost=0.00..33.17 rows=9 width=16)
-> Seq Scan on spark_29231_1 t1 (cost=0.00..33.12 rows=9 width=16)
Filter: (c1 = 1)
(6 rows)hive> explain select t1.* from spark_29231_1 t1 join spark_29231_2 t2 on (t1.c1 = t2.c1 and t1.c1 = 1);
Warning: Map Join MAPJOIN[11][bigTable=?] in task 'Stage-3:MAPRED' is a cross product
OK
STAGE DEPENDENCIES:
Stage-4 is a root stage
Stage-3 depends on stages: Stage-4
Stage-0 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-4
Map Reduce Local Work
Alias -> Map Local Tables:
$hdt$_0:t1
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
$hdt$_0:t1
TableScan
alias: t1
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Filter Operator
predicate: (c1 = 1L) (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: c2 (type: bigint)
outputColumnNames: _col1
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
HashTable Sink Operator
keys:
0
1
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
alias: t2
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Filter Operator
predicate: (UDFToLong(c1) = 1) (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0
1
outputColumnNames: _col1
Statistics: Num rows: 1 Data size: 1 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: 1L (type: bigint), _col1 (type: bigint)
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 1 Basic stats: PARTIAL Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 1 Basic stats: PARTIAL Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Execution mode: vectorized
Local Work:
Map Reduce Local Work
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Time taken: 0.2 seconds, Fetched: 69 row(s) |
|
Test build #116909 has finished for PR 27252 at commit
|
|
retest this please |
|
Test build #116923 has finished for PR 27252 at commit
|
|
Test build #116993 has finished for PR 27252 at commit
|
|
retest this please |
|
Test build #116998 has finished for PR 27252 at commit
|
|
Could you check if this fix can affect optimization time? IIRC we hit the time-consuming issue before in constraint propagation, e.g., e011004 |
|
@maropu Before this PR: After this PR: |
|
Test build #117104 has finished for PR 27252 at commit
|
...talyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
Outdated
Show resolved
Hide resolved
|
cc @cloud-fan |
| val candidateConstraints = binaryComparisons - eq | ||
| val bridge = Cast(r, lc.dataType, tz) | ||
| inferredConstraints ++= replaceConstraints(candidateConstraints, r, l) | ||
| inferredConstraints ++= replaceConstraints(candidateConstraints, lc, bridge) |
There was a problem hiding this comment.
is this safe? cast is not reversible. cast(int_col as long) = long_col can have different result than int_col = cast(long_col as int).
There was a problem hiding this comment.
2060190: Makes it only support filter at higher data type.
|
Test build #117624 has finished for PR 27252 at commit
|
|
retest this please |
|
Test build #117644 has finished for PR 27252 at commit
|
| */ | ||
| def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = { | ||
| var inferredConstraints = Set.empty[Expression] | ||
| val binaryComparisons = constraints.filter(_.isInstanceOf[BinaryComparison]) |
There was a problem hiding this comment.
do you know what constraints are not BinaryComparison? I think it's possible, but I can't find some examples.
There was a problem hiding this comment.
Sorry. Only BinaryComparison is incorrect. for example: int_column = long_column where long_column in (1L, 2L).
|
Test build #117800 has started for PR 27252 at commit |
|
retest this please |
2 similar comments
|
retest this please |
|
retest this please |
|
Test build #117816 has finished for PR 27252 at commit
|
|
retest this please |
|
Test build #117821 has finished for PR 27252 at commit
|
...rc/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
Show resolved
Hide resolved
| var inferredConstraints = Set.empty[Expression] | ||
| // IsNotNull should be constructed by `constructIsNotNullConstraints`. | ||
| val predicates = constraints.filterNot(_.isInstanceOf[IsNotNull]) | ||
| constraints.foreach { |
There was a problem hiding this comment.
We cannot do it like predicates.foreach { here?
|
|
||
| Seq(Some("left.a".attr.cast(LongType) === "right.b".attr), | ||
| Some("right.b".attr === "left.a".attr.cast(LongType))).foreach { condition => | ||
| testConstraintsAfterJoin(originalLeft, originalRight, left, right, Inner, condition) |
There was a problem hiding this comment.
In terms of test coverage, its better to test both cases (left/right-side casts)? I have the same comment in the test below.
|
Test build #118270 has finished for PR 27252 at commit
|
|
Test build #118273 has finished for PR 27252 at commit
|
|
Test build #118276 has finished for PR 27252 at commit
|
|
retest this please |
1 similar comment
|
retest this please |
| val left = testRelation1.where(IsNotNull('a) && 'a === 1).subquery('left) | ||
| val right = testRelation2.where(IsNotNull('b)).subquery('right) | ||
|
|
||
| Seq(Some("left.a".attr.cast(LongType) === "right.b".attr), |
There was a problem hiding this comment.
I might be wrong, but I find these test cases a bit confusing because left and right have equality filters. Eg. here 'a === 1 in left so actually it would be correct to infer 1.cast(LongType) === 'b for right. This PR doesn't do that obviously (#27518 will address that) but probably using inequalities ('a < 1) would be easier to follow.
| inferredConstraints ++= replaceConstraints(candidateConstraints, l, r) | ||
| inferredConstraints ++= replaceConstraints(candidateConstraints, r, l) | ||
| case eq @ EqualTo(l @ Cast(_: Attribute, _, _), r: Attribute) => | ||
| inferredConstraints ++= replaceConstraints(predicates - eq, r, l) |
There was a problem hiding this comment.
according to https://github.com/apache/spark/pull/27252/files#r378111623
If we have cast(a, dt) = b and b = 1, we can definitely infer cast(a, dt) = 1.
If we have cast(a, dt) = b and a = 1, seems we can also infer cast(1, dt) = b.
But I'm a bit unsure about how to do it. We may need a variant of replaceConstraints, which holds an expression builder "e => cast(e, dt) = b". It looks for attribute a, and replace it with cast(1, dt) = b
There was a problem hiding this comment.
I don't think we need to touch replaceConstraints at all. Please check #27518 that will do the trick because a = 1 will be "substituted" into cast(a, dt) = b as a new cast(1, dt) = b constraint.
There was a problem hiding this comment.
I think both this PR and #27518 are beneficial. But I would use val originalLeft = testRelation1.where('a < 1).subquery('left) in test cases of this one instead of val originalLeft = testRelation1.where('a === 1).subquery('left) to avoid confusion.
There was a problem hiding this comment.
@cloud-fan This PR support cast(1, dt) = b before:
https://github.com/apache/spark/compare/048a0ecc65763c6feaa939938e2dec6f4040d939..7dcfe915087dbe274b470928600197745a645f5e
I removed it because:
- It may be broken the plan. This is how I handled it before.
- For
cast(a, dt) = b, we support inferring many predicates, for example:a > 1,a < 1,a in (2, 3). I'm not sure if it's safe.
How about only supporting cast(a, dt) = 1 now?
@peter-toth I'd like to support these cases in #27518:
a < b && b < c infer a < c
a < b && b <= c infer a < c
a < b && b = c infer a < c
...
There was a problem hiding this comment.
@wangyum I see, but I think currently you are doing something very different in #27518 see details here: #27518 (comment)
I would suggest keeping your #27518 in its current form (but amending its title) and open a new one to address inequalities.
There was a problem hiding this comment.
How about only supporting cast(a, dt) = 1 now?
+1 for supporting the limited case only in this pr. Since this part of optimization can affect many queries, I think we need exhaustive discussions and tests for supporting wider cases.
|
Test build #118283 has finished for PR 27252 at commit
|
| Seq(Some("left.a".attr.cast(LongType) === "right.b".attr), | ||
| Some("right.b".attr === "left.a".attr.cast(LongType))).foreach { condition => | ||
| testConstraintsAfterJoin(originalLeft, originalRight, left, right, Inner, condition) | ||
| } |
There was a problem hiding this comment.
Let's also test cast(int) here. The key is: we should test both left side cast and right side cast, as @maropu said.
"left.a".attr.cast(LongType) === "right.b".attr and "right.b".attr === "left.a".attr.cast(LongType) only test left side cast (join left side, not EqualTo left side).
|
Test build #118311 has finished for PR 27252 at commit
|
|
retest this please |
|
Test build #118323 has finished for PR 27252 at commit
|
|
retest this please |
|
Test build #118329 has finished for PR 27252 at commit
|
|
thanks, merging to master! |
|
@wangyum Could you update the PR description with the perf measurement? |
|
@gatorsmile Done |
…constraint
### What changes were proposed in this pull request?
This PR add support infer constraints from cast equality constraint. For example:
```scala
scala> spark.sql("create table spark_29231_1(c1 bigint, c2 bigint)")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("create table spark_29231_2(c1 int, c2 bigint)")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("select t1.* from spark_29231_1 t1 join spark_29231_2 t2 on (t1.c1 = t2.c1 and t1.c1 = 1)").explain
== Physical Plan ==
*(2) Project [c1#5L, c2#6L]
+- *(2) BroadcastHashJoin [c1#5L], [cast(c1#7 as bigint)], Inner, BuildRight
:- *(2) Project [c1#5L, c2#6L]
: +- *(2) Filter (isnotnull(c1#5L) AND (c1#5L = 1))
: +- *(2) ColumnarToRow
: +- FileScan parquet default.spark_29231_1[c1#5L,c2#6L] Batched: true, DataFilters: [isnotnull(c1#5L), (c1#5L = 1)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehouse/spark_29231_1], PartitionFilters: [], PushedFilters: [IsNotNull(c1), EqualTo(c1,1)], ReadSchema: struct<c1:bigint,c2:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=apache#209]
+- *(1) Project [c1#7]
+- *(1) Filter isnotnull(c1#7)
+- *(1) ColumnarToRow
+- FileScan parquet default.spark_29231_2[c1#7] Batched: true, DataFilters: [isnotnull(c1#7)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehouse/spark_29231_2], PartitionFilters: [], PushedFilters: [IsNotNull(c1)], ReadSchema: struct<c1:int>
```
After this PR:
```scala
scala> spark.sql("select t1.* from spark_29231_1 t1 join spark_29231_2 t2 on (t1.c1 = t2.c1 and t1.c1 = 1)").explain
== Physical Plan ==
*(2) Project [c1#0L, c2#1L]
+- *(2) BroadcastHashJoin [c1#0L], [cast(c1#2 as bigint)], Inner, BuildRight
:- *(2) Project [c1#0L, c2#1L]
: +- *(2) Filter (isnotnull(c1#0L) AND (c1#0L = 1))
: +- *(2) ColumnarToRow
: +- FileScan parquet default.spark_29231_1[c1#0L,c2#1L] Batched: true, DataFilters: [isnotnull(c1#0L), (c1#0L = 1)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/spark/spark-warehouse/spark_29231_1], PartitionFilters: [], PushedFilters: [IsNotNull(c1), EqualTo(c1,1)], ReadSchema: struct<c1:bigint,c2:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=apache#99]
+- *(1) Project [c1#2]
+- *(1) Filter ((cast(c1#2 as bigint) = 1) AND isnotnull(c1#2))
+- *(1) ColumnarToRow
+- FileScan parquet default.spark_29231_2[c1#2] Batched: true, DataFilters: [(cast(c1#2 as bigint) = 1), isnotnull(c1#2)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/spark/spark-warehouse/spark_29231_2], PartitionFilters: [], PushedFilters: [IsNotNull(c1)], ReadSchema: struct<c1:int>
```
### Why are the changes needed?
Improve query performance.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Unit test.
Closes apache#27252 from wangyum/SPARK-29231.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This PR add support infer constraints from cast equality constraint. For example:
After this PR:
Why are the changes needed?
Improve query performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and benchmark test.
Benchmark code and benchmark result:
Before this PR:
After this PR: