Commit 789ec13
[SPARK-38570][SQL][3.1] Incorrect DynamicPartitionPruning caused by Literal
This is a backport of #35878 to branch 3.1.
The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.
For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
:- Union
: :- *(1) Project [4 AS store_id#5281, date_id#5283]
: : +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
: : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
: : +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
: : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
: : +- *(1) Project [store_id#5291, state_province#5292]
: : +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
: +- *(2) Project [5 AS store_id#5282, date_id#5287]
: +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
: : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
: +- *(2) ColumnarToRow
: +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
: +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
+- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
:- Union
: :- *(1) Project [4 AS store_id#5281, date_id#5283]
: : +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
: +- *(2) Project [5 AS store_id#5282, date_id#5287]
: +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
: +- *(2) ColumnarToRow
: +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
+- *(3) Project [store_id#5291, state_province#5292]
+- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
+- *(3) ColumnarToRow
+- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```
Execution performance improvement
No
Added unit test
Closes #35967 from mcdull-zhang/spark_38570_3.2.
Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 8621914)
Signed-off-by: Yuming Wang <yumwang@ebay.com>1 parent b0b226e commit 789ec13
File tree
2 files changed
+29
-0
lines changed- sql
- catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions
- core/src/test/scala/org/apache/spark/sql
2 files changed
+29
-0
lines changedLines changed: 1 addition & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
127 | 127 | | |
128 | 128 | | |
129 | 129 | | |
| 130 | + | |
130 | 131 | | |
131 | 132 | | |
132 | 133 | | |
| |||
Lines changed: 28 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
1382 | 1382 | | |
1383 | 1383 | | |
1384 | 1384 | | |
| 1385 | + | |
| 1386 | + | |
| 1387 | + | |
| 1388 | + | |
| 1389 | + | |
| 1390 | + | |
| 1391 | + | |
| 1392 | + | |
| 1393 | + | |
| 1394 | + | |
| 1395 | + | |
| 1396 | + | |
| 1397 | + | |
| 1398 | + | |
| 1399 | + | |
| 1400 | + | |
| 1401 | + | |
| 1402 | + | |
| 1403 | + | |
| 1404 | + | |
| 1405 | + | |
| 1406 | + | |
| 1407 | + | |
| 1408 | + | |
| 1409 | + | |
| 1410 | + | |
| 1411 | + | |
| 1412 | + | |
1385 | 1413 | | |
1386 | 1414 | | |
1387 | 1415 | | |
| |||
0 commit comments