Skip to content

Commit 7e8e4c0

Browse files
JkSelfcloud-fan
authored andcommitted
[SPARK-29552][SQL] Execute the "OptimizeLocalShuffleReader" rule when creating new query stage and then can optimize the shuffle reader to local shuffle reader as much as possible
### What changes were proposed in this pull request? `OptimizeLocalShuffleReader` rule is very conservative and gives up optimization as long as there are extra shuffles introduced. It's very likely that most of the added local shuffle readers are fine and only one introduces extra shuffle. However, it's very hard to make `OptimizeLocalShuffleReader` optimal, a simple workaround is to run this rule again right before executing a query stage. ### Why are the changes needed? Optimize more shuffle reader to local shuffle reader. ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing ut Closes #26207 from JkSelf/resolve-multi-joins-issue. Authored-by: jiake <ke.a.jia@intel.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent bfbf282 commit 7e8e4c0

File tree

2 files changed

+16
-4
lines changed

2 files changed

+16
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ case class AdaptiveSparkPlanExec(
9292
// optimizations should be stage-independent.
9393
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
9494
ReuseAdaptiveSubquery(conf, subqueryCache),
95+
96+
// When adding local shuffle readers in 'OptimizeLocalShuffleReader`, we revert all the local
97+
// readers if additional shuffles are introduced. This may be too conservative: maybe there is
98+
// only one local reader that introduces shuffle, and we can still keep other local readers.
99+
// Here we re-execute this rule with the sub-plan-tree of a query stage, to make sure necessary
100+
// local readers are added before executing the query stage.
101+
// This rule must be executed before `ReduceNumShufflePartitions`, as local shuffle readers
102+
// can't change number of partitions.
103+
OptimizeLocalShuffleReader(conf),
95104
ReduceNumShufflePartitions(conf),
96105
ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf,
97106
session.sessionState.columnarRules),

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,9 @@ class AdaptiveQueryExecSuite
163163
assert(smj.size == 3)
164164
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
165165
assert(bhj.size == 3)
166-
// additional shuffle exchange introduced, only one shuffle reader to local shuffle reader.
167-
checkNumLocalShuffleReaders(adaptivePlan, 1)
166+
// The child of remaining one BroadcastHashJoin is not ShuffleQueryStage.
167+
// So only two LocalShuffleReader.
168+
checkNumLocalShuffleReaders(adaptivePlan, 2)
168169
}
169170
}
170171

@@ -188,7 +189,8 @@ class AdaptiveQueryExecSuite
188189
assert(smj.size == 3)
189190
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
190191
assert(bhj.size == 3)
191-
// additional shuffle exchange introduced, only one shuffle reader to local shuffle reader.
192+
// The child of remaining two BroadcastHashJoin is not ShuffleQueryStage.
193+
// So only two LocalShuffleReader.
192194
checkNumLocalShuffleReaders(adaptivePlan, 1)
193195
}
194196
}
@@ -213,7 +215,8 @@ class AdaptiveQueryExecSuite
213215
assert(smj.size == 3)
214216
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
215217
assert(bhj.size == 3)
216-
// additional shuffle exchange introduced, only one shuffle reader to local shuffle reader.
218+
// The child of remaining two BroadcastHashJoin is not ShuffleQueryStage.
219+
// So only two LocalShuffleReader.
217220
checkNumLocalShuffleReaders(adaptivePlan, 1)
218221
}
219222
}

0 commit comments

Comments
 (0)