Skip to content

Commit c0ac578

Browse files
sarutakdongjoon-hyun
authored andcommitted
[SPARK-33850][SQL] EXPLAIN FORMATTED doesn't show the plan for subqueries if AQE is enabled
### What changes were proposed in this pull request? This PR fixes an issue that when AQE is enabled, EXPLAIN FORMATTED doesn't show the plan for subqueries. ```scala val df = spark.range(1, 100) df.createTempView("df") spark.sql("SELECT (SELECT min(id) AS v FROM df)").explain("FORMATTED") == Physical Plan == AdaptiveSparkPlan (3) +- Project (2) +- Scan OneRowRelation (1) (1) Scan OneRowRelation Output: [] Arguments: ParallelCollectionRDD[0] at explain at <console>:24, OneRowRelation, UnknownPartitioning(0) (2) Project Output [1]: [Subquery subquery#3, [id=#20] AS scalarsubquery()#5L] Input: [] (3) AdaptiveSparkPlan Output [1]: [scalarsubquery()#5L] Arguments: isFinalPlan=false ``` After this change, the plan for the subquerie is shown. ```scala == Physical Plan == * Project (2) +- * Scan OneRowRelation (1) (1) Scan OneRowRelation [codegen id : 1] Output: [] Arguments: ParallelCollectionRDD[0] at explain at <console>:24, OneRowRelation, UnknownPartitioning(0) (2) Project [codegen id : 1] Output [1]: [Subquery scalar-subquery#3, [id=#24] AS scalarsubquery()#5L] Input: [] ===== Subqueries ===== Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#3, [id=#24] * HashAggregate (6) +- Exchange (5) +- * HashAggregate (4) +- * Range (3) (3) Range [codegen id : 1] Output [1]: [id#0L] Arguments: Range (1, 100, step=1, splits=Some(12)) (4) HashAggregate [codegen id : 1] Input [1]: [id#0L] Keys: [] Functions [1]: [partial_min(id#0L)] Aggregate Attributes [1]: [min#7L] Results [1]: [min#8L] (5) Exchange Input [1]: [min#8L] Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#20] (6) HashAggregate [codegen id : 2] Input [1]: [min#8L] Keys: [] Functions [1]: [min(id#0L)] Aggregate Attributes [1]: [min(id#0L)#4L] Results [1]: [min(id#0L)#4L AS v#2L] ``` ### Why are the changes needed? For better debuggability. ### Does this PR introduce _any_ user-facing change? Yes. Users can see the formatted plan for subqueries. ### How was this patch tested? New test. Closes #30855 from sarutak/fix-aqe-explain. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 70da86a) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 1ce8000 commit c0ac578

File tree

3 files changed

+287
-0
lines changed

3 files changed

+287
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
218218
plan: => QueryPlan[_],
219219
subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = {
220220
plan.foreach {
221+
case a: AdaptiveSparkPlanExec =>
222+
getSubqueries(a.executedPlan, subqueries)
221223
case p: SparkPlan =>
222224
p.expressions.foreach (_.collect {
223225
case e: PlanExpression[_] =>

sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,101 @@ Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subq
407407
Output [2]: [key#x, val#x]
408408
Arguments: isFinalPlan=false
409409

410+
===== Subqueries =====
411+
412+
Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
413+
AdaptiveSparkPlan (10)
414+
+- HashAggregate (9)
415+
+- Exchange (8)
416+
+- HashAggregate (7)
417+
+- Project (6)
418+
+- Filter (5)
419+
+- Scan parquet default.explain_temp2 (4)
420+
421+
422+
(4) Scan parquet default.explain_temp2
423+
Output [2]: [key#x, val#x]
424+
Batched: true
425+
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
426+
PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)]
427+
ReadSchema: struct<key:int,val:int>
428+
429+
(5) Filter
430+
Input [2]: [key#x, val#x]
431+
Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subquery#x, [id=#x])) AND (val#x = 2))
432+
433+
(6) Project
434+
Output [1]: [key#x]
435+
Input [2]: [key#x, val#x]
436+
437+
(7) HashAggregate
438+
Input [1]: [key#x]
439+
Keys: []
440+
Functions [1]: [partial_max(key#x)]
441+
Aggregate Attributes [1]: [max#x]
442+
Results [1]: [max#x]
443+
444+
(8) Exchange
445+
Input [1]: [max#x]
446+
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]
447+
448+
(9) HashAggregate
449+
Input [1]: [max#x]
450+
Keys: []
451+
Functions [1]: [max(key#x)]
452+
Aggregate Attributes [1]: [max(key#x)#x]
453+
Results [1]: [max(key#x)#x AS max(key)#x]
454+
455+
(10) AdaptiveSparkPlan
456+
Output [1]: [max(key)#x]
457+
Arguments: isFinalPlan=false
458+
459+
Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery subquery#x, [id=#x]
460+
AdaptiveSparkPlan (17)
461+
+- HashAggregate (16)
462+
+- Exchange (15)
463+
+- HashAggregate (14)
464+
+- Project (13)
465+
+- Filter (12)
466+
+- Scan parquet default.explain_temp3 (11)
467+
468+
469+
(11) Scan parquet default.explain_temp3
470+
Output [2]: [key#x, val#x]
471+
Batched: true
472+
Location [not included in comparison]/{warehouse_dir}/explain_temp3]
473+
PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
474+
ReadSchema: struct<key:int,val:int>
475+
476+
(12) Filter
477+
Input [2]: [key#x, val#x]
478+
Condition : (isnotnull(val#x) AND (val#x > 0))
479+
480+
(13) Project
481+
Output [1]: [key#x]
482+
Input [2]: [key#x, val#x]
483+
484+
(14) HashAggregate
485+
Input [1]: [key#x]
486+
Keys: []
487+
Functions [1]: [partial_max(key#x)]
488+
Aggregate Attributes [1]: [max#x]
489+
Results [1]: [max#x]
490+
491+
(15) Exchange
492+
Input [1]: [max#x]
493+
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]
494+
495+
(16) HashAggregate
496+
Input [1]: [max#x]
497+
Keys: []
498+
Functions [1]: [max(key#x)]
499+
Aggregate Attributes [1]: [max(key#x)#x]
500+
Results [1]: [max(key#x)#x AS max(key)#x]
501+
502+
(17) AdaptiveSparkPlan
503+
Output [1]: [max(key)#x]
504+
Arguments: isFinalPlan=false
410505

411506
-- !query
412507
EXPLAIN FORMATTED
@@ -442,6 +537,101 @@ Condition : ((key#x = Subquery subquery#x, [id=#x]) OR (cast(key#x as double) =
442537
Output [2]: [key#x, val#x]
443538
Arguments: isFinalPlan=false
444539

540+
===== Subqueries =====
541+
542+
Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
543+
AdaptiveSparkPlan (10)
544+
+- HashAggregate (9)
545+
+- Exchange (8)
546+
+- HashAggregate (7)
547+
+- Project (6)
548+
+- Filter (5)
549+
+- Scan parquet default.explain_temp2 (4)
550+
551+
552+
(4) Scan parquet default.explain_temp2
553+
Output [2]: [key#x, val#x]
554+
Batched: true
555+
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
556+
PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
557+
ReadSchema: struct<key:int,val:int>
558+
559+
(5) Filter
560+
Input [2]: [key#x, val#x]
561+
Condition : (isnotnull(val#x) AND (val#x > 0))
562+
563+
(6) Project
564+
Output [1]: [key#x]
565+
Input [2]: [key#x, val#x]
566+
567+
(7) HashAggregate
568+
Input [1]: [key#x]
569+
Keys: []
570+
Functions [1]: [partial_max(key#x)]
571+
Aggregate Attributes [1]: [max#x]
572+
Results [1]: [max#x]
573+
574+
(8) Exchange
575+
Input [1]: [max#x]
576+
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]
577+
578+
(9) HashAggregate
579+
Input [1]: [max#x]
580+
Keys: []
581+
Functions [1]: [max(key#x)]
582+
Aggregate Attributes [1]: [max(key#x)#x]
583+
Results [1]: [max(key#x)#x AS max(key)#x]
584+
585+
(10) AdaptiveSparkPlan
586+
Output [1]: [max(key)#x]
587+
Arguments: isFinalPlan=false
588+
589+
Subquery:2 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
590+
AdaptiveSparkPlan (17)
591+
+- HashAggregate (16)
592+
+- Exchange (15)
593+
+- HashAggregate (14)
594+
+- Project (13)
595+
+- Filter (12)
596+
+- Scan parquet default.explain_temp3 (11)
597+
598+
599+
(11) Scan parquet default.explain_temp3
600+
Output [2]: [key#x, val#x]
601+
Batched: true
602+
Location [not included in comparison]/{warehouse_dir}/explain_temp3]
603+
PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
604+
ReadSchema: struct<key:int,val:int>
605+
606+
(12) Filter
607+
Input [2]: [key#x, val#x]
608+
Condition : (isnotnull(val#x) AND (val#x > 0))
609+
610+
(13) Project
611+
Output [1]: [key#x]
612+
Input [2]: [key#x, val#x]
613+
614+
(14) HashAggregate
615+
Input [1]: [key#x]
616+
Keys: []
617+
Functions [1]: [partial_avg(cast(key#x as bigint))]
618+
Aggregate Attributes [2]: [sum#x, count#xL]
619+
Results [2]: [sum#x, count#xL]
620+
621+
(15) Exchange
622+
Input [2]: [sum#x, count#xL]
623+
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]
624+
625+
(16) HashAggregate
626+
Input [2]: [sum#x, count#xL]
627+
Keys: []
628+
Functions [1]: [avg(cast(key#x as bigint))]
629+
Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
630+
Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]
631+
632+
(17) AdaptiveSparkPlan
633+
Output [1]: [avg(key)#x]
634+
Arguments: isFinalPlan=false
445635

446636
-- !query
447637
EXPLAIN FORMATTED
@@ -470,6 +660,79 @@ Input: []
470660
Output [1]: [(scalarsubquery() + scalarsubquery())#x]
471661
Arguments: isFinalPlan=false
472662

663+
===== Subqueries =====
664+
665+
Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
666+
AdaptiveSparkPlan (8)
667+
+- HashAggregate (7)
668+
+- Exchange (6)
669+
+- HashAggregate (5)
670+
+- Scan parquet default.explain_temp1 (4)
671+
672+
673+
(4) Scan parquet default.explain_temp1
674+
Output [1]: [key#x]
675+
Batched: true
676+
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
677+
ReadSchema: struct<key:int>
678+
679+
(5) HashAggregate
680+
Input [1]: [key#x]
681+
Keys: []
682+
Functions [1]: [partial_avg(cast(key#x as bigint))]
683+
Aggregate Attributes [2]: [sum#x, count#xL]
684+
Results [2]: [sum#x, count#xL]
685+
686+
(6) Exchange
687+
Input [2]: [sum#x, count#xL]
688+
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]
689+
690+
(7) HashAggregate
691+
Input [2]: [sum#x, count#xL]
692+
Keys: []
693+
Functions [1]: [avg(cast(key#x as bigint))]
694+
Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
695+
Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]
696+
697+
(8) AdaptiveSparkPlan
698+
Output [1]: [avg(key)#x]
699+
Arguments: isFinalPlan=false
700+
701+
Subquery:2 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
702+
AdaptiveSparkPlan (13)
703+
+- HashAggregate (12)
704+
+- Exchange (11)
705+
+- HashAggregate (10)
706+
+- Scan parquet default.explain_temp1 (9)
707+
708+
709+
(9) Scan parquet default.explain_temp1
710+
Output [1]: [key#x]
711+
Batched: true
712+
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
713+
ReadSchema: struct<key:int>
714+
715+
(10) HashAggregate
716+
Input [1]: [key#x]
717+
Keys: []
718+
Functions [1]: [partial_avg(cast(key#x as bigint))]
719+
Aggregate Attributes [2]: [sum#x, count#xL]
720+
Results [2]: [sum#x, count#xL]
721+
722+
(11) Exchange
723+
Input [2]: [sum#x, count#xL]
724+
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x]
725+
726+
(12) HashAggregate
727+
Input [2]: [sum#x, count#xL]
728+
Keys: []
729+
Functions [1]: [avg(cast(key#x as bigint))]
730+
Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
731+
Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]
732+
733+
(13) AdaptiveSparkPlan
734+
Output [1]: [avg(key)#x]
735+
Arguments: isFinalPlan=false
473736

474737
-- !query
475738
EXPLAIN FORMATTED

sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,28 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
277277
}
278278
}
279279

280+
test("SPARK-33850: explain formatted - check presence of subquery in case of AQE") {
281+
withTable("df1") {
282+
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
283+
withTable("df1") {
284+
spark.range(1, 100)
285+
.write
286+
.format("parquet")
287+
.mode("overwrite")
288+
.saveAsTable("df1")
289+
290+
val sqlText = "EXPLAIN FORMATTED SELECT (SELECT min(id) FROM df1) as v"
291+
val expected_pattern1 =
292+
"Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x"
293+
294+
withNormalizedExplain(sqlText) { normalizedOutput =>
295+
assert(expected_pattern1.r.findAllMatchIn(normalizedOutput).length == 1)
296+
}
297+
}
298+
}
299+
}
300+
}
301+
280302
test("Support ExplainMode in Dataset.explain") {
281303
val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1")
282304
val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2")

0 commit comments

Comments
 (0)