@@ -28,9 +28,11 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, S
28
28
import org .apache .spark .sql .execution .ui .SparkListenerSQLAdaptiveExecutionUpdate
29
29
30
30
/**
31
- * A root node to execute the query plan adaptively. It creates query fragments, and incrementally
32
- * updates the query plan when a query fragment is materialized and provides accurate runtime
33
- * data statistics.
31
+ * A root node to execute the query plan adaptively. It splits the query plan into independent
32
+ * stages and executes them in order according to their dependencies. The query stage
33
+ * materializes its output at the end. When one stage completes, the data statistics of its
34
+ * materialized output will be used to optimize the subsequent stages.
35
+ * This is called mid-query re-optimization in database literature.
34
36
*/
35
37
case class AdaptiveSparkPlanExec (initialPlan : SparkPlan , session : SparkSession )
36
38
extends LeafExecNode {
@@ -40,34 +42,34 @@ case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession)
40
42
@ volatile private var currentPlan : SparkPlan = initialPlan
41
43
@ volatile private var error : Throwable = null
42
44
43
- // We will release the lock when we finish planning query fragments , or we fail to do the
44
- // planning . Getting `finalPlan` will be blocked until the lock is release.
45
+ // We will release the lock when all the query stages are completed , or we fail to
46
+ // optimize/execute query stages . Getting `finalPlan` will be blocked until the lock is release.
45
47
// This is better than wait()/notify(), as we can easily check if the computation has completed,
46
48
// by calling `readyLock.getCount()`.
47
49
private val readyLock = new CountDownLatch (1 )
48
50
49
- private def createCallback (executionId : Option [Long ]): QueryFragmentCreatorCallback = {
50
- new QueryFragmentCreatorCallback {
51
- override def onPlanUpdate (updatedPlan : SparkPlan ): Unit = {
52
- updateCurrentPlan(updatedPlan, executionId)
53
- if (updatedPlan.isInstanceOf [ResultQueryFragmentExec ]) readyLock.countDown()
54
- }
51
+ private def createCallback (executionId : Option [Long ]) = new QueryStageManagerCallback {
52
+ override def onPlanUpdate (updatedPlan : SparkPlan ): Unit = {
53
+ updateCurrentPlan(updatedPlan, executionId)
54
+ }
55
+
56
+ override def onFinalPlan (finalPlan : SparkPlan ): Unit = {
57
+ updateCurrentPlan(finalPlan, executionId)
58
+ readyLock.countDown()
59
+ }
55
60
56
- override def onFragmentMaterializingFailed (
57
- fragment : QueryFragmentExec ,
58
- e : Throwable ): Unit = {
59
- error = new SparkException (
60
- s """
61
- |Fail to materialize fragment ${fragment.id}:
62
- | ${fragment.plan.treeString}
61
+ override def onStageMaterializationFailed (stage : QueryStageExec , e : Throwable ): Unit = {
62
+ error = new SparkException (
63
+ s """
64
+ |Fail to materialize query stage ${stage.id}:
65
+ | ${stage.plan.treeString}
63
66
""" .stripMargin, e)
64
- readyLock.countDown()
65
- }
67
+ readyLock.countDown()
68
+ }
66
69
67
- override def onError (e : Throwable ): Unit = {
68
- error = e
69
- readyLock.countDown()
70
- }
70
+ override def onError (e : Throwable ): Unit = {
71
+ error = e
72
+ readyLock.countDown()
71
73
}
72
74
}
73
75
@@ -81,18 +83,18 @@ case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession)
81
83
}
82
84
}
83
85
84
- def finalPlan : ResultQueryFragmentExec = {
86
+ def finalPlan : SparkPlan = {
85
87
if (readyLock.getCount > 0 ) {
86
88
val sc = session.sparkContext
87
89
val executionId = Option (sc.getLocalProperty(SQLExecution .EXECUTION_ID_KEY )).map(_.toLong)
88
- val creator = new QueryFragmentCreator (initialPlan, session, createCallback(executionId))
89
- creator .start()
90
+ val stageManager = new QueryStageManager (initialPlan, session, createCallback(executionId))
91
+ stageManager .start()
90
92
readyLock.await()
91
- creator .stop()
93
+ stageManager .stop()
92
94
}
93
95
94
96
if (error != null ) throw error
95
- currentPlan. asInstanceOf [ ResultQueryFragmentExec ]
97
+ currentPlan
96
98
}
97
99
98
100
override def executeCollect (): Array [InternalRow ] = finalPlan.executeCollect()
0 commit comments