@@ -32,7 +32,31 @@ import org.apache.spark.util.{EventLoop, ThreadUtils}
32
32
/**
33
33
* This class dynamically creates [[QueryStage ]] bottom-up, optimize the query plan of query stages
34
34
* and materialize them. It creates as many query stages as possible at the same time, and
35
- * creates/optimizes a query stage when all its child stages are materialized.
35
+ * materialize a query stage when all its child stages are materialized.
36
+ *
37
+ * To create query stages, we traverse the query tree bottom up. When we hit an exchange node, and
38
+ * all the child query stages of this exchange node are materialized, we try to create a new query
39
+ * stage for this exchange node.
40
+ *
41
+ * To create a new query stage, we first optimize the sub-tree of the exchange. After optimization,
42
+ * we check the output partitioning of the optimized sub-tree, and see if the exchange node is still
43
+ * necessary.
44
+ *
45
+ * If the exchange node becomes unnecessary, remove it and give up this query stage creation, and
46
+ * continue to traverse the query plan tree until we hit the next exchange node.
47
+ *
48
+ * If the exchange node is still needed, create the query stage and optimize its sub-tree again.
49
+ * It's necessary to have both the pre-creation optimization and post-creation optimization, because
50
+ * these 2 optimization have different assumptions. For pre-creation optimization, the shuffle node
51
+ * may be removed later on and the current sub-tree may be only a part of a query stage, so we don't
52
+ * have the big picture of the query stage yet. For post-creation optimization, the query stage is
53
+ * created and we have the big picture of the query stage.
54
+ *
55
+ * After the query stage is optimized, we materialize it asynchronously, and continue to traverse
56
+ * the query plan tree to create more query stages.
57
+ *
58
+ * When a query stage completes materialization, we trigger the process of query stages creation and
59
+ * traverse the query plan tree again.
36
60
*/
37
61
class QueryStageCreator (
38
62
initialPlan : SparkPlan ,
@@ -48,16 +72,24 @@ class QueryStageCreator(
48
72
49
73
private val stageCache = mutable.HashMap .empty[StructType , mutable.Buffer [(Exchange , QueryStage )]]
50
74
51
- private val phaseOneOptimizerRules : Seq [Rule [SparkPlan ]] = Seq (
75
+ // The optimizer rules that will be applied to a sub-tree of the query plan before the stage is
76
+ // created. Note that we may end up not creating the query stage, so the rules here should not
77
+ // assume the given sub-plan-tree is the entire query plan of the query stage. For example, if a
78
+ // rule want to collect all the child query stages, it should not be put here.
79
+ private val preStageCreationOptimizerRules : Seq [Rule [SparkPlan ]] = Seq (
52
80
AssertChildStagesMaterialized
53
81
)
54
82
55
- private val phaseTwoOptimizerRules : Seq [Rule [SparkPlan ]] = Seq (
83
+ // The optimizer rules that will be applied to a sub-tree of the query plan after the stage is
84
+ // created. Note that once the stage is created, we will not remove it anymore. If a rule changes
85
+ // the output partitioning of the sub-plan-tree, which may help to remove the exchange node, it's
86
+ // better to put it in `preStageCreationOptimizerRules`, so that we may create less query stages.
87
+ private val postStageCreationOptimizerRules : Seq [Rule [SparkPlan ]] = Seq (
56
88
ReduceNumShufflePartitions (conf),
57
89
CollapseCodegenStages (conf),
58
90
ReuseSubquery (conf))
59
91
60
- private var currentPlan = createBottomQueryStages (initialPlan)
92
+ private var currentPlan = createQueryStages (initialPlan)
61
93
62
94
private implicit def executionContext : ExecutionContextExecutorService = {
63
95
QueryStageCreator .executionContext
@@ -80,26 +112,29 @@ class QueryStageCreator(
80
112
stop()
81
113
} else {
82
114
readyStages += stage.id
83
- currentPlan = createBottomQueryStages (currentPlan)
115
+ currentPlan = createQueryStages (currentPlan)
84
116
}
85
117
}
86
118
87
- private def phaseOneOptimize (plan : SparkPlan ): SparkPlan = {
88
- phaseOneOptimizerRules .foldLeft(plan) {
119
+ private def preStageCreationOptimize (plan : SparkPlan ): SparkPlan = {
120
+ preStageCreationOptimizerRules .foldLeft(plan) {
89
121
case (current, rule) => rule(current)
90
122
}
91
123
}
92
124
93
- private def phaseTwoOptimize (plan : SparkPlan ): SparkPlan = {
94
- phaseTwoOptimizerRules .foldLeft(plan) {
125
+ private def postStageCreationOptimize (plan : SparkPlan ): SparkPlan = {
126
+ postStageCreationOptimizerRules .foldLeft(plan) {
95
127
case (current, rule) => rule(current)
96
128
}
97
129
}
98
130
99
- private def createBottomQueryStages (plan : SparkPlan ): SparkPlan = {
100
- val result = tryCreateQueryStage(plan)
101
- if (result.stageReady) {
102
- val finalPlan = phaseTwoOptimize(phaseOneOptimize(result.newPlan))
131
+ /**
132
+ * Traverse the query plan bottom-up, and creates query stages as many as possible.
133
+ */
134
+ private def createQueryStages (plan : SparkPlan ): SparkPlan = {
135
+ val result = createQueryStages0(plan)
136
+ if (result.allChildStagesReady) {
137
+ val finalPlan = postStageCreationOptimize(preStageCreationOptimize(result.newPlan))
103
138
post(StageReady (ResultQueryStage (currentStageId, finalPlan)))
104
139
finalPlan
105
140
} else {
@@ -108,57 +143,70 @@ class QueryStageCreator(
108
143
}
109
144
}
110
145
111
- private def tryCreateQueryStage (plan : SparkPlan ): CreateStageResult = plan match {
146
+ /**
147
+ * This method is called recursively to traverse the plan tree bottom-up. This method returns two
148
+ * information: 1) the new plan after we insert query stages. 2) whether or not the child query
149
+ * stages of the new plan are all ready.
150
+ *
151
+ * if the current plan is an exchange node, and all its child query stages are ready, we try to
152
+ * create a new query stage.
153
+ */
154
+ private def createQueryStages0 (plan : SparkPlan ): CreateStageResult = plan match {
112
155
case e : Exchange =>
113
156
val similarStages = stageCache.getOrElseUpdate(e.schema, mutable.Buffer .empty)
114
157
similarStages.find(_._1.sameResult(e)) match {
115
158
case Some ((_, existingStage)) if conf.exchangeReuseEnabled =>
116
159
CreateStageResult (
117
160
newPlan = ReusedQueryStage (existingStage, e.output),
118
- stageReady = readyStages.contains(existingStage.id))
161
+ allChildStagesReady = readyStages.contains(existingStage.id))
119
162
120
163
case _ =>
121
- val result = tryCreateQueryStage(e.child)
122
- if (result.stageReady) {
123
- val optimizedPlan = phaseOneOptimize(result.newPlan)
164
+ val result = createQueryStages0(e.child)
165
+ // Try to create a query stage only when all the child query stages are ready.
166
+ if (result.allChildStagesReady) {
167
+ val optimizedPlan = preStageCreationOptimize(result.newPlan)
124
168
e match {
125
169
case s : ShuffleExchangeExec =>
126
170
(s.desiredPartitioning, optimizedPlan.outputPartitioning) match {
127
171
case (desired : HashPartitioning , actual : HashPartitioning )
128
172
if desired.semanticEquals(actual) =>
129
173
// This shuffle exchange is unnecessary now, remove it. The reason maybe:
130
- // 1. the child plan has changed its output partitioning, and makes this
131
- // exchange node unnecessary.
174
+ // 1. the child plan has changed its output partitioning after optimization,
175
+ // and makes this exchange node unnecessary.
132
176
// 2. this exchange node is user specified, which turns out to be unnecessary.
133
- CreateStageResult (newPlan = optimizedPlan, stageReady = true )
177
+ CreateStageResult (newPlan = optimizedPlan, allChildStagesReady = true )
134
178
case _ =>
135
179
val queryStage = createQueryStage(s.copy(child = optimizedPlan))
136
180
similarStages.append(e -> queryStage)
137
- CreateStageResult (newPlan = queryStage, stageReady = false )
181
+ // We've created a new stage, which is obviously not ready yet.
182
+ CreateStageResult (newPlan = queryStage, allChildStagesReady = false )
138
183
}
139
184
140
185
case b : BroadcastExchangeExec =>
141
186
val queryStage = createQueryStage(b.copy(child = optimizedPlan))
142
187
similarStages.append(e -> queryStage)
143
- CreateStageResult (newPlan = queryStage, stageReady = false )
188
+ // We've created a new stage, which is obviously not ready yet.
189
+ CreateStageResult (newPlan = queryStage, allChildStagesReady = false )
144
190
}
145
191
} else {
146
- CreateStageResult (newPlan = e.withNewChildren(Seq (result.newPlan)), stageReady = false )
192
+ CreateStageResult (
193
+ newPlan = e.withNewChildren(Seq (result.newPlan)),
194
+ allChildStagesReady = false )
147
195
}
148
196
}
149
197
150
198
case q : QueryStage =>
151
- CreateStageResult (newPlan = q, stageReady = readyStages.contains(q.id))
199
+ CreateStageResult (newPlan = q, allChildStagesReady = readyStages.contains(q.id))
152
200
153
201
case _ =>
154
- val results = plan.children.map(tryCreateQueryStage )
202
+ val results = plan.children.map(createQueryStages0 )
155
203
CreateStageResult (
156
204
newPlan = plan.withNewChildren(results.map(_.newPlan)),
157
- stageReady = results.forall(_.stageReady ))
205
+ allChildStagesReady = results.forall(_.allChildStagesReady ))
158
206
}
159
207
160
208
private def createQueryStage (e : Exchange ): QueryStage = {
161
- val optimizedPlan = phaseTwoOptimize (e.child)
209
+ val optimizedPlan = postStageCreationOptimize (e.child)
162
210
val queryStage = e match {
163
211
case s : ShuffleExchangeExec =>
164
212
ShuffleQueryStage (currentStageId, s.copy(child = optimizedPlan))
@@ -173,7 +221,7 @@ class QueryStageCreator(
173
221
override protected def onError (e : Throwable ): Unit = callback.onError(e)
174
222
}
175
223
176
- case class CreateStageResult (newPlan : SparkPlan , stageReady : Boolean )
224
+ case class CreateStageResult (newPlan : SparkPlan , allChildStagesReady : Boolean )
177
225
178
226
object QueryStageCreator {
179
227
private val executionContext = ExecutionContext .fromExecutorService(
0 commit comments