Commit a338be2
[SPARK-38570][SQL] Incorrect DynamicPartitionPruning caused by Literal
### What changes were proposed in this pull request?
The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.
For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```text
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
:- Union
: :- *(1) Project [4 AS store_id#5281, date_id#5283]
: : +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
: : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
: : +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
: : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
: : +- *(1) Project [store_id#5291, state_province#5292]
: : +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
: +- *(2) Project [5 AS store_id#5282, date_id#5287]
: +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
: : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
: +- *(2) ColumnarToRow
: +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
: +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
+- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
```
after this pr:
```text
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
:- Union
: :- *(1) Project [4 AS store_id#5281, date_id#5283]
: : +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
: +- *(2) Project [5 AS store_id#5282, date_id#5287]
: +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
: +- *(2) ColumnarToRow
: +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
+- *(3) Project [store_id#5291, state_province#5292]
+- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
+- *(3) ColumnarToRow
+- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```
### Why are the changes needed?
Execution performance improvement
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit test
Closes #35878 from mcdull-zhang/literal_dynamic_partition.
Lead-authored-by: mcdull-zhang <work4dong@163.com>
Co-authored-by: mcdull_zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 4c51851)
Signed-off-by: Yuming Wang <yumwang@ebay.com>1 parent 5582f92 commit a338be2
File tree
2 files changed
+29
-0
lines changed- sql
- catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions
- core/src/test/scala/org/apache/spark/sql
2 files changed
+29
-0
lines changedLines changed: 1 addition & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
128 | 128 | | |
129 | 129 | | |
130 | 130 | | |
| 131 | + | |
131 | 132 | | |
132 | 133 | | |
133 | 134 | | |
| |||
Lines changed: 28 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
1528 | 1528 | | |
1529 | 1529 | | |
1530 | 1530 | | |
| 1531 | + | |
| 1532 | + | |
| 1533 | + | |
| 1534 | + | |
| 1535 | + | |
| 1536 | + | |
| 1537 | + | |
| 1538 | + | |
| 1539 | + | |
| 1540 | + | |
| 1541 | + | |
| 1542 | + | |
| 1543 | + | |
| 1544 | + | |
| 1545 | + | |
| 1546 | + | |
| 1547 | + | |
| 1548 | + | |
| 1549 | + | |
| 1550 | + | |
| 1551 | + | |
| 1552 | + | |
| 1553 | + | |
| 1554 | + | |
| 1555 | + | |
| 1556 | + | |
| 1557 | + | |
| 1558 | + | |
1531 | 1559 | | |
1532 | 1560 | | |
1533 | 1561 | | |
| |||
0 commit comments