Skip to content

Commit a83967c

Browse files
JkSelfcarsonwang
authored andcommitted
Avoid the prepareExecuteStage#QueryStage method is executed multi-times when call executeCollect, executeToIterator and executeTake action multi-times (apache#70)
* Avoid the prepareExecuteStage#QueryStage method is executed multi-times when call executeCollect, executeToIterator and executeTake action multi-times * only add the check in prepareExecuteStage method to avoid duplicate check in other methods * small fix
1 parent 7df45f8 commit a83967c

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,18 @@ abstract class QueryStage extends UnaryExecNode {
8585
Future.sequence(shuffleStageFutures)(implicitly, QueryStage.executionContext), Duration.Inf)
8686
}
8787

88+
private var prepared = false
89+
8890
/**
8991
* Before executing the plan in this query stage, we execute all child stages, optimize the plan
9092
* in this stage and determine the reducer number based on the child stages' statistics. Finally
9193
* we do a codegen for this query stage and update the UI with the new plan.
9294
*/
93-
def prepareExecuteStage(): Unit = {
95+
def prepareExecuteStage(): Unit = synchronized {
96+
// Ensure the prepareExecuteStage method only be executed once.
97+
if (prepared) {
98+
return
99+
}
94100
// 1. Execute childStages
95101
executeChildStages()
96102
// It is possible to optimize this stage's plan here based on the child stages' statistics.
@@ -126,6 +132,7 @@ abstract class QueryStage extends UnaryExecNode {
126132
queryExecution.toString,
127133
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)))
128134
}
135+
prepared = true
129136
}
130137

131138
// Caches the created ShuffleRowRDD so we can reuse that.

0 commit comments

Comments
 (0)