Skip to content

Commit 8fbfdb3

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-31495][SQL] Support formatted explain for AQE
### What changes were proposed in this pull request? To support formatted explain for AQE. ### Why are the changes needed? AQE does not support formatted explain yet. It's good to support it for better user experience, debugging, etc. Before: ``` == Physical Plan == AdaptiveSparkPlan (1) +- * HashAggregate (unknown) +- CustomShuffleReader (unknown) +- ShuffleQueryStage (unknown) +- Exchange (unknown) +- * HashAggregate (unknown) +- * Project (unknown) +- * BroadcastHashJoin Inner BuildRight (unknown) :- * LocalTableScan (unknown) +- BroadcastQueryStage (unknown) +- BroadcastExchange (unknown) +- LocalTableScan (unknown) (1) AdaptiveSparkPlan Output [4]: [k#7, count(v1)#32L, sum(v1)#33L, avg(v2)#34] Arguments: HashAggregate(keys=[k#7], functions=[count(1), sum(cast(v1#8 as bigint)), avg(cast(v2#19 as bigint))]), AdaptiveExecutionContext(org.apache.spark.sql.SparkSession104ab57b), [PlanAdaptiveSubqueries(Map())], false ``` After: ``` == Physical Plan == AdaptiveSparkPlan (14) +- * HashAggregate (13) +- CustomShuffleReader (12) +- ShuffleQueryStage (11) +- Exchange (10) +- * HashAggregate (9) +- * Project (8) +- * BroadcastHashJoin Inner BuildRight (7) :- * Project (2) : +- * LocalTableScan (1) +- BroadcastQueryStage (6) +- BroadcastExchange (5) +- * Project (4) +- * LocalTableScan (3) (1) LocalTableScan [codegen id : 2] Output [2]: [_1#x, _2#x] Arguments: [_1#x, _2#x] (2) Project [codegen id : 2] Output [2]: [_1#x AS k#x, _2#x AS v1#x] Input [2]: [_1#x, _2#x] (3) LocalTableScan [codegen id : 1] Output [2]: [_1#x, _2#x] Arguments: [_1#x, _2#x] (4) Project [codegen id : 1] Output [2]: [_1#x AS k#x, _2#x AS v2#x] Input [2]: [_1#x, _2#x] (5) BroadcastExchange Input [2]: [k#x, v2#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#x] (6) BroadcastQueryStage Output [2]: [k#x, v2#x] Arguments: 0 (7) BroadcastHashJoin [codegen id : 2] Left keys [1]: [k#x] Right keys [1]: [k#x] Join condition: None (8) Project [codegen id : 2] Output [3]: [k#x, v1#x, v2#x] Input [4]: [k#x, v1#x, k#x, v2#x] (9) HashAggregate [codegen id : 2] Input [3]: [k#x, v1#x, v2#x] Keys [1]: [k#x] Functions [3]: [partial_count(1), partial_sum(cast(v1#x as bigint)), partial_avg(cast(v2#x as bigint))] Aggregate Attributes [4]: [count#xL, sum#xL, sum#x, count#xL] Results [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] (10) Exchange Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] Arguments: hashpartitioning(k#x, 5), true, [id=#x] (11) ShuffleQueryStage Output [5]: [sum#xL, k#x, sum#x, count#xL, count#xL] Arguments: 1 (12) CustomShuffleReader Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] Arguments: coalesced (13) HashAggregate [codegen id : 3] Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] Keys [1]: [k#x] Functions [3]: [count(1), sum(cast(v1#x as bigint)), avg(cast(v2#x as bigint))] Aggregate Attributes [3]: [count(1)#xL, sum(cast(v1#x as bigint))#xL, avg(cast(v2#x as bigint))#x] Results [4]: [k#x, count(1)#xL AS count(v1)#xL, sum(cast(v1#x as bigint))#xL AS sum(v1)#xL, avg(cast(v2#x as bigint))#x AS avg(v2)#x] (14) AdaptiveSparkPlan Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x] Arguments: isFinalPlan=true ``` ### Does this PR introduce any user-facing change? No, this should be new feature along with AQE in Spark 3.0. ### How was this patch tested? Added a query file: `explain-aqe.sql` and a unit test. Closes #28271 from Ngone51/support_formatted_explain_for_aqe. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 1d30884 commit 8fbfdb3

File tree

9 files changed

+981
-37
lines changed

9 files changed

+981
-37
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import scala.collection.mutable.ArrayBuffer
2323
import org.apache.spark.sql.AnalysisException
2424
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
2525
import org.apache.spark.sql.catalyst.plans.QueryPlan
26+
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}
2627

27-
object ExplainUtils {
28+
object ExplainUtils extends AdaptiveSparkPlanHelper {
2829
/**
2930
* Given a input physical plan, performs the following tasks.
3031
* 1. Computes the operator id for current operator and records it in the operaror
@@ -144,15 +145,26 @@ object ExplainUtils {
144145
case p: WholeStageCodegenExec =>
145146
case p: InputAdapter =>
146147
case other: QueryPlan[_] =>
147-
if (!other.getTagValue(QueryPlan.OP_ID_TAG).isDefined) {
148+
149+
def setOpId(): Unit = if (other.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
148150
currentOperationID += 1
149151
other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
150152
operatorIDs += ((currentOperationID, other))
151153
}
152-
other.innerChildren.foreach { plan =>
153-
currentOperationID = generateOperatorIDs(plan,
154-
currentOperationID,
155-
operatorIDs)
154+
155+
other match {
156+
case p: AdaptiveSparkPlanExec =>
157+
currentOperationID =
158+
generateOperatorIDs(p.executedPlan, currentOperationID, operatorIDs)
159+
setOpId()
160+
case p: QueryStageExec =>
161+
currentOperationID = generateOperatorIDs(p.plan, currentOperationID, operatorIDs)
162+
setOpId()
163+
case _ =>
164+
setOpId()
165+
other.innerChildren.foldLeft(currentOperationID) {
166+
(curId, plan) => generateOperatorIDs(plan, curId, operatorIDs)
167+
}
156168
}
157169
}
158170
currentOperationID
@@ -163,21 +175,25 @@ object ExplainUtils {
163175
* whole stage code gen id in the plan via setting a tag.
164176
*/
165177
private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = {
178+
var currentCodegenId = -1
179+
180+
def setCodegenId(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
181+
if (currentCodegenId != -1) {
182+
p.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
183+
}
184+
children.foreach(generateWholeStageCodegenIds)
185+
}
186+
166187
// Skip the subqueries as they are not printed as part of main query block.
167188
if (plan.isInstanceOf[BaseSubqueryExec]) {
168189
return
169190
}
170-
var currentCodegenId = -1
171191
plan.foreach {
172192
case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId
173193
case _: InputAdapter => currentCodegenId = -1
174-
case other: QueryPlan[_] =>
175-
if (currentCodegenId != -1) {
176-
other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
177-
}
178-
other.innerChildren.foreach { plan =>
179-
generateWholeStageCodegenIds(plan)
180-
}
194+
case p: AdaptiveSparkPlanExec => setCodegenId(p, Seq(p.executedPlan))
195+
case p: QueryStageExec => setCodegenId(p, Seq(p.plan))
196+
case other: QueryPlan[_] => setCodegenId(other, other.innerChildren)
181197
}
182198
}
183199

@@ -232,13 +248,16 @@ object ExplainUtils {
232248
}
233249

234250
def removeTags(plan: QueryPlan[_]): Unit = {
251+
def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
252+
p.unsetTagValue(QueryPlan.OP_ID_TAG)
253+
p.unsetTagValue(QueryPlan.CODEGEN_ID_TAG)
254+
children.foreach(removeTags)
255+
}
256+
235257
plan foreach {
236-
case plan: QueryPlan[_] =>
237-
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
238-
plan.unsetTagValue(QueryPlan.CODEGEN_ID_TAG)
239-
plan.innerChildren.foreach { p =>
240-
removeTags(p)
241-
}
258+
case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan))
259+
case p: QueryStageExec => remove(p, Seq(p.plan))
260+
case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
242261
}
243262
}
244263
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,7 @@ case class AdaptiveSparkPlanExec(
251251
getFinalPhysicalPlan().execute()
252252
}
253253

254-
override def verboseString(maxFields: Int): String = simpleString(maxFields)
255-
256-
override def simpleString(maxFields: Int): String =
257-
s"AdaptiveSparkPlan(isFinalPlan=$isFinalPlan)"
254+
protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")
258255

259256
override def generateTreeString(
260257
depth: Int,

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ case class InsertAdaptiveSparkPlan(
122122
if !subqueryMap.contains(exprId.id) =>
123123
val executedPlan = compileSubquery(p)
124124
verifyAdaptivePlan(executedPlan, p)
125-
val subquery = SubqueryExec(s"subquery${exprId.id}", executedPlan)
125+
val subquery = SubqueryExec(s"subquery#${exprId.id}", executedPlan)
126126
subqueryMap.put(exprId.id, subquery)
127127
case expressions.InSubquery(_, ListQuery(query, _, exprId, _))
128128
if !subqueryMap.contains(exprId.id) =>
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
--IMPORT explain.sql
2+
3+
--SET spark.sql.adaptive.enabled=true

sql/core/src/test/resources/sql-tests/inputs/explain.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,4 @@ EXPLAIN FORMATTED
117117
DROP TABLE explain_temp1;
118118
DROP TABLE explain_temp2;
119119
DROP TABLE explain_temp3;
120+
DROP TABLE explain_temp4;

0 commit comments

Comments
 (0)