Skip to content

Commit ea93dbf

Browse files
cloud-fancarsonwang
authored andcommitted
simplify QueryStage (#5)
* do not re-implement exchange reuse * simplify QueryStage * add comments * new idea * polish * address comments * improve QueryStageTrigger
1 parent 5819826 commit ea93dbf

File tree

16 files changed

+699
-481
lines changed

16 files changed

+699
-481
lines changed

core/src/main/scala/org/apache/spark/MapOutputStatistics.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,3 @@ package org.apache.spark
2525
* (may be inexact due to use of compressed map statuses)
2626
*/
2727
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
28-
extends Serializable

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,12 @@ class QueryExecution(
9494
* row format conversions as needed.
9595
*/
9696
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
97-
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
98-
adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
97+
val rules = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
98+
adaptivePreparations
9999
} else {
100-
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
100+
preparations
101101
}
102+
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
102103
}
103104

104105
/** A sequence of rules that will be applied in order to the physical plan before execution. */
@@ -109,14 +110,16 @@ class QueryExecution(
109110
ReuseExchange(sparkSession.sessionState.conf),
110111
ReuseSubquery(sparkSession.sessionState.conf))
111112

113+
// With adaptive execution, whole stage codegen will be done inside `QueryStageExecutor`.
112114
protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
113115
PlanSubqueries(sparkSession),
114116
EnsureRequirements(sparkSession.sessionState.conf),
117+
ReuseExchange(sparkSession.sessionState.conf),
115118
ReuseSubquery(sparkSession.sessionState.conf),
116119
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
117-
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will
120+
// by inserting leaf node QueryStage. Transforming the plan after applying this rule will
118121
// only transform node in a sub-tree.
119-
PlanQueryStage(sparkSession.sessionState.conf))
122+
PlanQueryStage(sparkSession))
120123

121124
def simpleString: String = withRedaction {
122125
val concat = new StringConcat()

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
21-
import org.apache.spark.sql.execution.adaptive.QueryStageInput
21+
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlan, QueryStage}
2222
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
2323
import org.apache.spark.sql.execution.metric.SQLMetricInfo
2424
import org.apache.spark.sql.internal.SQLConf
@@ -53,7 +53,8 @@ private[execution] object SparkPlanInfo {
5353
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
5454
val children = plan match {
5555
case ReusedExchangeExec(_, child) => child :: Nil
56-
case i: QueryStageInput => i.childStage :: Nil
56+
case a: AdaptiveSparkPlan => a.resultStage.plan :: Nil
57+
case stage: QueryStage => stage.plan :: Nil
5758
case _ => plan.children ++ plan.subqueries
5859
}
5960
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.adaptive
19+
20+
import java.util.concurrent.CountDownLatch
21+
22+
import org.apache.spark.rdd.RDD
23+
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.catalyst.expressions.Attribute
26+
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, SQLExecution}
27+
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
28+
29+
/**
30+
* A root node to trigger query stages and execute the query plan adaptively. It incrementally
31+
* updates the query plan when a query stage is materialized and provides accurate runtime
32+
* statistics.
33+
*/
34+
case class AdaptiveSparkPlan(initialPlan: ResultQueryStage, session: SparkSession)
35+
extends LeafExecNode{
36+
37+
override def output: Seq[Attribute] = initialPlan.output
38+
39+
@volatile private var currentQueryStage: QueryStage = initialPlan
40+
@volatile private var error: Throwable = null
41+
private val readyLock = new CountDownLatch(1)
42+
43+
private def replaceStage(oldStage: QueryStage, newStage: QueryStage): QueryStage = {
44+
if (oldStage.id == newStage.id) {
45+
newStage
46+
} else {
47+
val newPlanForOldStage = oldStage.plan.transform {
48+
case q: QueryStage => replaceStage(q, newStage)
49+
}
50+
oldStage.withNewPlan(newPlanForOldStage)
51+
}
52+
}
53+
54+
private def createCallback(executionId: Option[Long]): QueryStageTriggerCallback = {
55+
new QueryStageTriggerCallback {
56+
override def onStageUpdated(stage: QueryStage): Unit = {
57+
updateCurrentQueryStage(stage, executionId)
58+
if (stage.isInstanceOf[ResultQueryStage]) readyLock.countDown()
59+
}
60+
61+
override def onStagePlanningFailed(stage: QueryStage, e: Throwable): Unit = {
62+
error = new RuntimeException(
63+
s"""
64+
|Fail to plan stage ${stage.id}:
65+
|${stage.plan.treeString}
66+
""".stripMargin, e)
67+
readyLock.countDown()
68+
}
69+
70+
override def onStageMaterializingFailed(stage: QueryStage, e: Throwable): Unit = {
71+
error = new RuntimeException(
72+
s"""
73+
|Fail to materialize stage ${stage.id}:
74+
|${stage.plan.treeString}
75+
""".stripMargin, e)
76+
readyLock.countDown()
77+
}
78+
79+
override def onError(e: Throwable): Unit = {
80+
error = e
81+
readyLock.countDown()
82+
}
83+
}
84+
}
85+
86+
private def updateCurrentQueryStage(newStage: QueryStage, executionId: Option[Long]): Unit = {
87+
currentQueryStage = replaceStage(currentQueryStage, newStage)
88+
executionId.foreach { id =>
89+
session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
90+
id,
91+
SQLExecution.getQueryExecution(id).toString,
92+
SparkPlanInfo.fromSparkPlan(currentQueryStage)))
93+
}
94+
}
95+
96+
def resultStage: ResultQueryStage = {
97+
if (readyLock.getCount > 0) {
98+
val sc = session.sparkContext
99+
val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
100+
val trigger = new QueryStageTrigger(session, createCallback(executionId))
101+
trigger.start()
102+
trigger.trigger(initialPlan)
103+
readyLock.await()
104+
trigger.stop()
105+
}
106+
107+
if (error != null) throw error
108+
currentQueryStage.asInstanceOf[ResultQueryStage]
109+
}
110+
111+
override def executeCollect(): Array[InternalRow] = resultStage.executeCollect()
112+
override def executeTake(n: Int): Array[InternalRow] = resultStage.executeTake(n)
113+
override def executeToIterator(): Iterator[InternalRow] = resultStage.executeToIterator()
114+
override def doExecute(): RDD[InternalRow] = resultStage.execute()
115+
override def generateTreeString(
116+
depth: Int,
117+
lastChildren: Seq[Boolean],
118+
append: String => Unit,
119+
verbose: Boolean,
120+
prefix: String = "",
121+
addSuffix: Boolean = false,
122+
maxFields: Int): Unit = {
123+
currentQueryStage.generateTreeString(
124+
depth, lastChildren, append, verbose, "", false, maxFields)
125+
}
126+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala

Lines changed: 28 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,65 +17,41 @@
1717

1818
package org.apache.spark.sql.execution.adaptive
1919

20-
import scala.collection.mutable
21-
import scala.collection.mutable.ArrayBuffer
22-
20+
import org.apache.spark.sql.SparkSession
2321
import org.apache.spark.sql.catalyst.rules.Rule
2422
import org.apache.spark.sql.execution.SparkPlan
25-
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec}
26-
import org.apache.spark.sql.internal.SQLConf
27-
import org.apache.spark.sql.types.StructType
23+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec}
2824

2925
/**
30-
* Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it adds a
31-
* QueryStage and a QueryStageInput. If reusing Exchange is enabled, it finds duplicated exchanges
32-
* and uses the same QueryStage for all the references. Note this rule must be run after
33-
* EnsureRequirements rule. The rule divides the plan into multiple sub-trees as QueryStageInput
34-
* is a leaf node. Transforming the plan after applying this rule will only transform node in a
35-
* sub-tree.
26+
* Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it wraps it with
27+
* a [[QueryStage]]. At the end it adds an [[AdaptiveSparkPlan]] at the top, which will drive the
28+
* execution of query stages.
3629
*/
37-
case class PlanQueryStage(conf: SQLConf) extends Rule[SparkPlan] {
30+
case class PlanQueryStage(session: SparkSession) extends Rule[SparkPlan] {
3831

3932
def apply(plan: SparkPlan): SparkPlan = {
40-
41-
val newPlan = if (!conf.exchangeReuseEnabled) {
42-
plan.transformUp {
43-
case e: ShuffleExchangeExec =>
44-
ShuffleQueryStageInput(ShuffleQueryStage(e), e.output)
45-
case e: BroadcastExchangeExec =>
46-
BroadcastQueryStageInput(BroadcastQueryStage(e), e.output)
47-
}
48-
} else {
49-
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
50-
val stages = mutable.HashMap[StructType, ArrayBuffer[QueryStage]]()
51-
52-
plan.transformUp {
53-
case exchange: Exchange =>
54-
val sameSchema = stages.getOrElseUpdate(exchange.schema, ArrayBuffer[QueryStage]())
55-
val samePlan = sameSchema.find { s =>
56-
exchange.sameResult(s.child)
57-
}
58-
if (samePlan.isDefined) {
59-
// Keep the output of this exchange, the following plans require that to resolve
60-
// attributes.
61-
exchange match {
62-
case e: ShuffleExchangeExec => ShuffleQueryStageInput(
63-
samePlan.get.asInstanceOf[ShuffleQueryStage], exchange.output)
64-
case e: BroadcastExchangeExec => BroadcastQueryStageInput(
65-
samePlan.get.asInstanceOf[BroadcastQueryStage], exchange.output)
66-
}
67-
} else {
68-
val queryStageInput = exchange match {
69-
case e: ShuffleExchangeExec =>
70-
ShuffleQueryStageInput(ShuffleQueryStage(e), e.output)
71-
case e: BroadcastExchangeExec =>
72-
BroadcastQueryStageInput(BroadcastQueryStage(e), e.output)
73-
}
74-
sameSchema += queryStageInput.childStage
75-
queryStageInput
76-
}
77-
}
33+
var id = 0
34+
val exchangeToQueryStage = new java.util.IdentityHashMap[Exchange, QueryStage]
35+
val planWithStages = plan.transformUp {
36+
case e: ShuffleExchangeExec =>
37+
val queryStage = ShuffleQueryStage(id, e)
38+
id += 1
39+
exchangeToQueryStage.put(e, queryStage)
40+
queryStage
41+
case e: BroadcastExchangeExec =>
42+
val queryStage = BroadcastQueryStage(id, e)
43+
id += 1
44+
exchangeToQueryStage.put(e, queryStage)
45+
queryStage
46+
// The `ReusedExchangeExec` was added in the rule `ReuseExchange`, via transforming up the
47+
// query plan. This rule also transform up the query plan, so when we hit `ReusedExchangeExec`
48+
// here, the exchange being reused must already be hit before and there should be an entry
49+
// for it in `exchangeToQueryStage`.
50+
case e: ReusedExchangeExec =>
51+
val existingQueryStage = exchangeToQueryStage.get(e.child)
52+
assert(existingQueryStage != null, "The exchange being reused should be hit before.")
53+
ReusedQueryStage(existingQueryStage, e.output)
7854
}
79-
ResultQueryStage(newPlan)
55+
AdaptiveSparkPlan(ResultQueryStage(id, planWithStages), session)
8056
}
8157
}

0 commit comments

Comments
 (0)