Skip to content

Commit 666bf76

Browse files
committed
rename to QueryFragment
1 parent 41f3a90 commit 666bf76

File tree

10 files changed

+358
-352
lines changed

10 files changed

+358
-352
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
21-
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlan, QueryStage}
21+
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryFragmentExec}
2222
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
2323
import org.apache.spark.sql.execution.metric.SQLMetricInfo
2424
import org.apache.spark.sql.internal.SQLConf
@@ -53,8 +53,8 @@ private[execution] object SparkPlanInfo {
5353
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
5454
val children = plan match {
5555
case ReusedExchangeExec(_, child) => child :: Nil
56-
case a: AdaptiveSparkPlan => a.finalPlan.plan :: Nil
57-
case stage: QueryStage => stage.plan :: Nil
56+
case a: AdaptiveSparkPlanExec => a.finalPlan.plan :: Nil
57+
case stage: QueryFragmentExec => stage.plan :: Nil
5858
case _ => plan.children ++ plan.subqueries
5959
}
6060
val metrics = plan.metrics.toSeq.map { case (key, metric) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlan.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,36 +28,38 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, S
2828
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
2929

3030
/**
31-
* A root node to execute the query plan adaptively. It creates query stages, and incrementally
32-
* updates the query plan when a query stage is materialized and provides accurate runtime
31+
* A root node to execute the query plan adaptively. It creates query fragments, and incrementally
32+
* updates the query plan when a query fragment is materialized and provides accurate runtime
3333
* data statistics.
3434
*/
35-
case class AdaptiveSparkPlan(initialPlan: SparkPlan, session: SparkSession)
35+
case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession)
3636
extends LeafExecNode{
3737

3838
override def output: Seq[Attribute] = initialPlan.output
3939

4040
@volatile private var currentPlan: SparkPlan = initialPlan
4141
@volatile private var error: Throwable = null
4242

43-
// We will release the lock when we finish planning query stages, or we fail to do the planning.
44-
// Getting `resultStage` will be blocked until the lock is release.
43+
// We will release the lock when we finish planning query fragments, or we fail to do the
44+
// planning. Getting `finalPlan` will be blocked until the lock is release.
4545
// This is better than wait()/notify(), as we can easily check if the computation has completed,
4646
// by calling `readyLock.getCount()`.
4747
private val readyLock = new CountDownLatch(1)
4848

49-
private def createCallback(executionId: Option[Long]): QueryStageCreatorCallback = {
50-
new QueryStageCreatorCallback {
49+
private def createCallback(executionId: Option[Long]): QueryFragmentCreatorCallback = {
50+
new QueryFragmentCreatorCallback {
5151
override def onPlanUpdate(updatedPlan: SparkPlan): Unit = {
5252
updateCurrentPlan(updatedPlan, executionId)
53-
if (updatedPlan.isInstanceOf[ResultQueryStage]) readyLock.countDown()
53+
if (updatedPlan.isInstanceOf[ResultQueryFragmentExec]) readyLock.countDown()
5454
}
5555

56-
override def onStageMaterializingFailed(stage: QueryStage, e: Throwable): Unit = {
56+
override def onFragmentMaterializingFailed(
57+
fragment: QueryFragmentExec,
58+
e: Throwable): Unit = {
5759
error = new SparkException(
5860
s"""
59-
|Fail to materialize stage ${stage.id}:
60-
|${stage.plan.treeString}
61+
|Fail to materialize fragment ${fragment.id}:
62+
|${fragment.plan.treeString}
6163
""".stripMargin, e)
6264
readyLock.countDown()
6365
}
@@ -79,18 +81,18 @@ case class AdaptiveSparkPlan(initialPlan: SparkPlan, session: SparkSession)
7981
}
8082
}
8183

82-
def finalPlan: ResultQueryStage = {
84+
def finalPlan: ResultQueryFragmentExec = {
8385
if (readyLock.getCount > 0) {
8486
val sc = session.sparkContext
8587
val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
86-
val creator = new QueryStageCreator(initialPlan, session, createCallback(executionId))
88+
val creator = new QueryFragmentCreator(initialPlan, session, createCallback(executionId))
8789
creator.start()
8890
readyLock.await()
8991
creator.stop()
9092
}
9193

9294
if (error != null) throw error
93-
currentPlan.asInstanceOf[ResultQueryStage]
95+
currentPlan.asInstanceOf[ResultQueryFragmentExec]
9496
}
9597

9698
override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect()

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.SparkPlan
2323
import org.apache.spark.sql.execution.command.ExecutedCommandExec
2424

2525
/**
26-
* This rule wraps the query plan with an [[AdaptiveSparkPlan]], which executes the query plan
26+
* This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which executes the query plan
2727
* adaptively with runtime data statistics. Note that this rule must be run after
2828
* [[org.apache.spark.sql.execution.exchange.EnsureRequirements]], so that the exchange nodes are
2929
* already inserted.
@@ -33,7 +33,7 @@ case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan
3333
override def apply(plan: SparkPlan): SparkPlan = plan match {
3434
case _: ExecutedCommandExec => plan
3535
case _ if session.sessionState.conf.adaptiveExecutionEnabled =>
36-
AdaptiveSparkPlan(plan, session.cloneSession())
36+
AdaptiveSparkPlanExec(plan, session.cloneSession())
3737
case _ => plan
3838
}
3939
}
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.adaptive
19+
20+
import scala.collection.mutable
21+
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
22+
23+
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
25+
import org.apache.spark.sql.catalyst.rules.Rule
26+
import org.apache.spark.sql.execution.{CollapseCodegenStages, SparkPlan}
27+
import org.apache.spark.sql.execution.adaptive.rule.{AssertChildFragmentsMaterialized, ReduceNumShufflePartitions}
28+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec}
29+
import org.apache.spark.sql.types.StructType
30+
import org.apache.spark.util.{EventLoop, ThreadUtils}
31+
32+
/**
33+
* This class dynamically creates [[QueryFragmentExec]] bottom-up, optimize the query plan of query
34+
* fragments and materialize them. It creates as many query fragments as possible at the same time,
35+
* and materialize a query fragment when all its child fragments are materialized.
36+
*
37+
* To create query fragments, we traverse the query tree bottom up. When we hit an exchange node,
38+
* and all the child query fragments of this exchange node are materialized, we try to create a new
39+
* query fragment for this exchange node.
40+
*
41+
* To create a new query fragment, we first optimize the sub-tree of the exchange. After
42+
* optimization, we check the output partitioning of the optimized sub-tree, and see if the
43+
* exchange node is still necessary.
44+
*
45+
* If the exchange node becomes unnecessary, remove it and give up this query fragment creation,
46+
* and continue to traverse the query plan tree until we hit the next exchange node.
47+
*
48+
* If the exchange node is still needed, create the query fragment and optimize its sub-tree again.
49+
* It's necessary to have both the pre-creation optimization and post-creation optimization, because
50+
* these 2 optimization have different assumptions. For pre-creation optimization, the shuffle node
51+
* may be removed later on and the current sub-tree may be only a part of a query fragment, so we
52+
* don't have the big picture of the query fragment yet. For post-creation optimization, the query
53+
* fragment is created and we have the big picture of the query fragment.
54+
*
55+
* After the query fragment is optimized, we materialize it asynchronously, and continue to traverse
56+
* the query plan tree to create more query fragments.
57+
*
58+
* When a query fragment completes materialization, we trigger the process of query fragments
59+
* creation and traverse the query plan tree again.
60+
*/
61+
class QueryFragmentCreator(
62+
initialPlan: SparkPlan,
63+
session: SparkSession,
64+
callback: QueryFragmentCreatorCallback)
65+
extends EventLoop[QueryFragmentCreatorEvent]("QueryFragmentCreator") {
66+
67+
private def conf = session.sessionState.conf
68+
69+
private val readyFragments = mutable.HashSet.empty[Int]
70+
71+
private var currentFragmentId = 0
72+
73+
private val fragmentCache =
74+
mutable.HashMap.empty[StructType, mutable.Buffer[(Exchange, QueryFragmentExec)]]
75+
76+
// The optimizer rules that will be applied to a sub-tree of the query plan before the fragment is
77+
// created. Note that we may end up not creating the query fragment, so the rules here should not
78+
// assume the given sub-plan-tree is the entire query plan of the query fragment. For example, if
79+
// a rule want to collect all the child query fragments, it should not be put here.
80+
private val preFragmentCreationOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
81+
AssertChildFragmentsMaterialized
82+
)
83+
84+
// The optimizer rules that will be applied to a sub-tree of the query plan after the fragment is
85+
// created. Note that once the fragment is created, we will not remove it anymore. If a rule
86+
// changes the output partitioning of the sub-plan-tree, which may help to remove the exchange
87+
// node, it's better to put it in `preFragmentCreationOptimizerRules`, so that we may create less
88+
// query fragments.
89+
private val postFragmentCreationOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
90+
ReduceNumShufflePartitions(conf),
91+
CollapseCodegenStages(conf))
92+
93+
private var currentPlan = initialPlan
94+
95+
private val localProperties = session.sparkContext.getLocalProperties
96+
97+
private implicit def executionContext: ExecutionContextExecutorService = {
98+
QueryFragmentCreator.executionContext
99+
}
100+
101+
override protected def onReceive(event: QueryFragmentCreatorEvent): Unit = event match {
102+
case StartCreation =>
103+
// set active session and local properties for the event loop thread.
104+
SparkSession.setActiveSession(session)
105+
session.sparkContext.setLocalProperties(localProperties)
106+
currentPlan = createQueryFragments(initialPlan)
107+
108+
case MaterializeFragment(fragment) =>
109+
fragment.materialize().onComplete { res =>
110+
if (res.isSuccess) {
111+
post(FragmentReady(fragment))
112+
} else {
113+
callback.onFragmentMaterializingFailed(fragment, res.failed.get)
114+
stop()
115+
}
116+
}
117+
118+
case FragmentReady(fragment) =>
119+
if (fragment.isInstanceOf[ResultQueryFragmentExec]) {
120+
callback.onPlanUpdate(fragment)
121+
stop()
122+
} else {
123+
readyFragments += fragment.id
124+
currentPlan = createQueryFragments(currentPlan)
125+
}
126+
}
127+
128+
override protected def onStart(): Unit = {
129+
post(StartCreation)
130+
}
131+
132+
private def preFragmentCreationOptimize(plan: SparkPlan): SparkPlan = {
133+
preFragmentCreationOptimizerRules.foldLeft(plan) {
134+
case (current, rule) => rule(current)
135+
}
136+
}
137+
138+
private def postFragmentCreationOptimize(plan: SparkPlan): SparkPlan = {
139+
postFragmentCreationOptimizerRules.foldLeft(plan) {
140+
case (current, rule) => rule(current)
141+
}
142+
}
143+
144+
/**
145+
* Traverse the query plan bottom-up, and creates query fragments as many as possible.
146+
*/
147+
private def createQueryFragments(plan: SparkPlan): SparkPlan = {
148+
val result = createQueryFragments0(plan)
149+
if (result.allChildFragmentsReady) {
150+
val finalPlan = postFragmentCreationOptimize(preFragmentCreationOptimize(result.newPlan))
151+
post(FragmentReady(ResultQueryFragmentExec(currentFragmentId, finalPlan)))
152+
finalPlan
153+
} else {
154+
callback.onPlanUpdate(result.newPlan)
155+
result.newPlan
156+
}
157+
}
158+
159+
/**
160+
* This method is called recursively to traverse the plan tree bottom-up. This method returns two
161+
* information: 1) the new plan after we insert query fragments. 2) whether or not the child query
162+
* fragments of the new plan are all ready.
163+
*
164+
* if the current plan is an exchange node, and all its child query fragments are ready, we try to
165+
* create a new query fragment.
166+
*/
167+
private def createQueryFragments0(plan: SparkPlan): CreateFragmentResult = plan match {
168+
case e: Exchange =>
169+
val similarFragments = fragmentCache.getOrElseUpdate(e.schema, mutable.Buffer.empty)
170+
similarFragments.find(_._1.sameResult(e)) match {
171+
case Some((_, existingFragment)) if conf.exchangeReuseEnabled =>
172+
CreateFragmentResult(
173+
newPlan = ReusedQueryFragmentExec(existingFragment, e.output),
174+
allChildFragmentsReady = readyFragments.contains(existingFragment.id))
175+
176+
case _ =>
177+
val result = createQueryFragments0(e.child)
178+
// Try to create a query fragment only when all the child query fragments are ready.
179+
if (result.allChildFragmentsReady) {
180+
val optimizedPlan = preFragmentCreationOptimize(result.newPlan)
181+
e match {
182+
case s: ShuffleExchangeExec =>
183+
(s.desiredPartitioning, optimizedPlan.outputPartitioning) match {
184+
case (desired: HashPartitioning, actual: HashPartitioning)
185+
if desired.semanticEquals(actual) =>
186+
// This shuffle exchange is unnecessary now, remove it. The reason maybe:
187+
// 1. the child plan has changed its output partitioning after optimization,
188+
// and makes this exchange node unnecessary.
189+
// 2. this exchange node is user specified, which turns out to be unnecessary.
190+
CreateFragmentResult(newPlan = optimizedPlan, allChildFragmentsReady = true)
191+
case _ =>
192+
val queryFragment = createQueryFragment(s.copy(child = optimizedPlan))
193+
similarFragments.append(e -> queryFragment)
194+
// We've created a new fragment, which is obviously not ready yet.
195+
CreateFragmentResult(newPlan = queryFragment, allChildFragmentsReady = false)
196+
}
197+
198+
case b: BroadcastExchangeExec =>
199+
val queryFragment = createQueryFragment(b.copy(child = optimizedPlan))
200+
similarFragments.append(e -> queryFragment)
201+
// We've created a new fragment, which is obviously not ready yet.
202+
CreateFragmentResult(newPlan = queryFragment, allChildFragmentsReady = false)
203+
}
204+
} else {
205+
CreateFragmentResult(
206+
newPlan = e.withNewChildren(Seq(result.newPlan)),
207+
allChildFragmentsReady = false)
208+
}
209+
}
210+
211+
case q: QueryFragmentExec =>
212+
CreateFragmentResult(newPlan = q, allChildFragmentsReady = readyFragments.contains(q.id))
213+
214+
case _ =>
215+
if (plan.children.isEmpty) {
216+
CreateFragmentResult(newPlan = plan, allChildFragmentsReady = true)
217+
} else {
218+
val results = plan.children.map(createQueryFragments0)
219+
CreateFragmentResult(
220+
newPlan = plan.withNewChildren(results.map(_.newPlan)),
221+
allChildFragmentsReady = results.forall(_.allChildFragmentsReady))
222+
}
223+
}
224+
225+
private def createQueryFragment(e: Exchange): QueryFragmentExec = {
226+
val optimizedPlan = postFragmentCreationOptimize(e.child)
227+
val queryFragment = e match {
228+
case s: ShuffleExchangeExec =>
229+
ShuffleQueryFragmentExec(currentFragmentId, s.copy(child = optimizedPlan))
230+
case b: BroadcastExchangeExec =>
231+
BroadcastQueryFragmentExec(currentFragmentId, b.copy(child = optimizedPlan))
232+
}
233+
currentFragmentId += 1
234+
post(MaterializeFragment(queryFragment))
235+
queryFragment
236+
}
237+
238+
override protected def onError(e: Throwable): Unit = callback.onError(e)
239+
}
240+
241+
case class CreateFragmentResult(newPlan: SparkPlan, allChildFragmentsReady: Boolean)
242+
243+
object QueryFragmentCreator {
244+
private val executionContext = ExecutionContext.fromExecutorService(
245+
ThreadUtils.newDaemonCachedThreadPool("QueryFragmentCreator", 16))
246+
}
247+
248+
trait QueryFragmentCreatorCallback {
249+
def onPlanUpdate(updatedPlan: SparkPlan): Unit
250+
def onFragmentMaterializingFailed(fragment: QueryFragmentExec, e: Throwable): Unit
251+
def onError(e: Throwable): Unit
252+
}
253+
254+
sealed trait QueryFragmentCreatorEvent
255+
256+
object StartCreation extends QueryFragmentCreatorEvent
257+
258+
case class MaterializeFragment(fragment: QueryFragmentExec) extends QueryFragmentCreatorEvent
259+
260+
case class FragmentReady(fragment: QueryFragmentExec) extends QueryFragmentCreatorEvent

0 commit comments

Comments
 (0)