Skip to content

Commit 3e28f33

Browse files
ulysses-youcloud-fan
authored andcommitted
[SPARK-39447][SQL] Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast
### What changes were proposed in this pull request? Change `currentPhysicalPlan` to `inputPlan ` when we restore the broadcast exchange for DPP. ### Why are the changes needed? The currentPhysicalPlan can be wrapped with broadcast query stage so it is not safe to match it. For example: The broadcast exchange which is added by DPP is running before than the normal broadcast exchange(e.g. introduced by join). ### Does this PR introduce _any_ user-facing change? yes bug fix ### How was this patch tested? add test Closes #36974 from ulysses-you/inputplan. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit c320a5d) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 463a24d commit 3e28f33

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ case class AdaptiveSparkPlanExec(
659659
// node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery.
660660
// Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan is
661661
// already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule.
662-
val finalPlan = currentPhysicalPlan match {
662+
val finalPlan = inputPlan match {
663663
case b: BroadcastExchangeLike
664664
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan))
665665
case _ => newPlan

sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1694,6 +1694,25 @@ class DynamicPartitionPruningV1SuiteAEOff extends DynamicPartitionPruningV1Suite
16941694
class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
16951695
with EnableAdaptiveExecutionSuite {
16961696

1697+
test("SPARK-39447: Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast") {
1698+
val df = sql(
1699+
"""
1700+
|WITH empty_result AS (
1701+
| SELECT * FROM fact_stats WHERE product_id < 0
1702+
|)
1703+
|SELECT *
1704+
|FROM (SELECT /*+ SHUFFLE_MERGE(fact_sk) */ empty_result.store_id
1705+
| FROM fact_sk
1706+
| JOIN empty_result
1707+
| ON fact_sk.product_id = empty_result.product_id) t2
1708+
| JOIN empty_result
1709+
| ON t2.store_id = empty_result.store_id
1710+
""".stripMargin)
1711+
1712+
checkPartitionPruningPredicate(df, false, false)
1713+
checkAnswer(df, Nil)
1714+
}
1715+
16971716
test("SPARK-37995: PlanAdaptiveDynamicPruningFilters should use prepareExecutedPlan " +
16981717
"rather than createSparkPlan to re-plan subquery") {
16991718
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",

0 commit comments

Comments
 (0)