Skip to content

Commit 9b64981

Browse files
committed
simplify QueryStage
1 parent 2b04686 commit 9b64981

File tree

15 files changed

+381
-435
lines changed

15 files changed

+381
-435
lines changed

core/src/main/scala/org/apache/spark/MapOutputStatistics.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,3 @@ package org.apache.spark
2525
* (may be inexact due to use of compressed map statuses)
2626
*/
2727
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
28-
extends Serializable

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,12 @@ class QueryExecution(
9494
* row format conversions as needed.
9595
*/
9696
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
97-
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
98-
adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
97+
val rules = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
98+
adaptivePreparations
9999
} else {
100-
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
100+
preparations
101101
}
102+
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
102103
}
103104

104105
/** A sequence of rules that will be applied in order to the physical plan before execution. */
@@ -109,8 +110,7 @@ class QueryExecution(
109110
ReuseExchange(sparkSession.sessionState.conf),
110111
ReuseSubquery(sparkSession.sessionState.conf))
111112

112-
// With adaptive execution, whole stage codegen will be planned inside `QueryStage`, so we exclude
113-
// `CollapseCodegenStages` here.
113+
// With adaptive execution, whole stage codegen will be done inside `QueryStageExecutor`.
114114
protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
115115
PlanSubqueries(sparkSession),
116116
EnsureRequirements(sparkSession.sessionState.conf),
@@ -119,7 +119,7 @@ class QueryExecution(
119119
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
120120
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will
121121
// only transform node in a sub-tree.
122-
PlanQueryStage)
122+
PlanQueryStage(sparkSession.sessionState.conf))
123123

124124
def simpleString: String = withRedaction {
125125
val concat = new StringConcat()

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
21-
import org.apache.spark.sql.execution.adaptive.QueryStageInput
21+
import org.apache.spark.sql.execution.adaptive.QueryStage
2222
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
2323
import org.apache.spark.sql.execution.metric.SQLMetricInfo
2424
import org.apache.spark.sql.internal.SQLConf
@@ -53,7 +53,7 @@ private[execution] object SparkPlanInfo {
5353
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
5454
val children = plan match {
5555
case ReusedExchangeExec(_, child) => child :: Nil
56-
case i: QueryStageInput => i.childStage :: Nil
56+
case stage: QueryStage => stage.finalPlan :: Nil
5757
case _ => plan.children ++ plan.subqueries
5858
}
5959
val metrics = plan.metrics.toSeq.map { case (key, metric) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.adaptive
2020
import org.apache.spark.sql.catalyst.rules.Rule
2121
import org.apache.spark.sql.execution.SparkPlan
2222
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec}
23+
import org.apache.spark.sql.internal.SQLConf
2324

2425
/**
2526
* Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it adds a
@@ -29,27 +30,27 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange,
2930
* is a leaf node. Transforming the plan after applying this rule will only transform node in a
3031
* sub-tree.
3132
*/
32-
object PlanQueryStage extends Rule[SparkPlan] {
33+
case class PlanQueryStage(conf: SQLConf) extends Rule[SparkPlan] {
3334

3435
def apply(plan: SparkPlan): SparkPlan = {
3536
val exchangeToQueryStage = new java.util.IdentityHashMap[Exchange, QueryStage]
3637
val newPlan = plan.transformUp {
3738
case e: ShuffleExchangeExec =>
3839
val queryStage = ShuffleQueryStage(e)
3940
exchangeToQueryStage.put(e, queryStage)
40-
ShuffleQueryStageInput(queryStage, e.output)
41+
ShuffleQueryStageReaderExec(queryStage, queryStage.output)
4142
case e: BroadcastExchangeExec =>
4243
val queryStage = BroadcastQueryStage(e)
4344
exchangeToQueryStage.put(e, queryStage)
44-
BroadcastQueryStageInput(queryStage, e.output)
45+
BroadcastQueryStageReaderExec(queryStage, queryStage.output)
4546
// The `ReusedExchangeExec` was added in the rule `ReuseExchange`, via transforming up the
4647
// query plan. This rule also transform up the query plan, so when we hit `ReusedExchangeExec`
4748
// here, the exchange being reused must already be hit before and there should be an entry
4849
// for it in `exchangeToQueryStage`.
4950
case e: ReusedExchangeExec =>
5051
exchangeToQueryStage.get(e.child) match {
51-
case q: ShuffleQueryStage => ShuffleQueryStageInput(q, e.output)
52-
case q: BroadcastQueryStage => BroadcastQueryStageInput(q, e.output)
52+
case q: ShuffleQueryStage => ShuffleQueryStageReaderExec(q, e.output)
53+
case q: BroadcastQueryStage => BroadcastQueryStageReaderExec(q, e.output)
5354
}
5455
}
5556
ResultQueryStage(newPlan)

0 commit comments

Comments
 (0)