Skip to content

Commit 258bfcf

Browse files
Eric5553cloud-fan
authored andcommitted
[SPARK-30651][SQL] Add detailed information for Aggregate operators in EXPLAIN FORMATTED
### What changes were proposed in this pull request? Currently `EXPLAIN FORMATTED` only report input attributes of HashAggregate/ObjectHashAggregate/SortAggregate, while `EXPLAIN EXTENDED` provides more information of Keys, Functions, etc. This PR enhanced `EXPLAIN FORMATTED` to sync with original explain behavior. ### Why are the changes needed? The newly added `EXPLAIN FORMATTED` got less information comparing to the original `EXPLAIN EXTENDED` ### Does this PR introduce any user-facing change? Yes, taking HashAggregate explain result as example. **SQL** ``` EXPLAIN FORMATTED SELECT COUNT(val) + SUM(key) as TOTAL, COUNT(key) FILTER (WHERE val > 1) FROM explain_temp1; ``` **EXPLAIN EXTENDED** ``` == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(val#6), sum(cast(key#5 as bigint)), count(key#5)], output=[TOTAL#62L, count(key) FILTER (WHERE (val > 1))#71L]) +- Exchange SinglePartition, true, [id=#89] +- HashAggregate(keys=[], functions=[partial_count(val#6), partial_sum(cast(key#5 as bigint)), partial_count(key#5) FILTER (WHERE (val#6 > 1))], output=[count#75L, sum#76L, count#77L]) +- *(1) ColumnarToRow +- FileScan parquet default.explain_temp1[key#5,val#6] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/XXX/spark-dev/spark/spark-warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,val:int> ``` **EXPLAIN FORMATTED - BEFORE** ``` == Physical Plan == * HashAggregate (5) +- Exchange (4) +- HashAggregate (3) +- * ColumnarToRow (2) +- Scan parquet default.explain_temp1 (1) ... ... (5) HashAggregate [codegen id : 2] Input: [count#91L, sum#92L, count#93L] ... ... ``` **EXPLAIN FORMATTED - AFTER** ``` == Physical Plan == * HashAggregate (5) +- Exchange (4) +- HashAggregate (3) +- * ColumnarToRow (2) +- Scan parquet default.explain_temp1 (1) ... ... (5) HashAggregate [codegen id : 2] Input: [count#91L, sum#92L, count#93L] Keys: [] Functions: [count(val#6), sum(cast(key#5 as bigint)), count(key#5)] Results: [(count(val#6)#84L + sum(cast(key#5 as bigint))#85L) AS TOTAL#78L, count(key#5)#86L AS count(key) FILTER (WHERE (val > 1))#87L] Output: [TOTAL#78L, count(key) FILTER (WHERE (val > 1))#87L] ... ... ``` ### How was this patch tested? Three tests added in explain.sql for HashAggregate/ObjectHashAggregate/SortAggregate. Closes #27368 from Eric5553/ExplainFormattedAgg. Authored-by: Eric Wu <492960551@qq.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 5919bd3) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 2a059e6 commit 258bfcf

File tree

6 files changed

+300
-10
lines changed

6 files changed

+300
-10
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.aggregate
19+
20+
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression}
21+
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
22+
import org.apache.spark.sql.execution.{ExplainUtils, UnaryExecNode}
23+
24+
/**
25+
* Holds common logic for aggregate operators
26+
*/
27+
trait BaseAggregateExec extends UnaryExecNode {
28+
def groupingExpressions: Seq[NamedExpression]
29+
def aggregateExpressions: Seq[AggregateExpression]
30+
def aggregateAttributes: Seq[Attribute]
31+
def resultExpressions: Seq[NamedExpression]
32+
33+
override def verboseStringWithOperatorId(): String = {
34+
val inputString = child.output.mkString("[", ", ", "]")
35+
val keyString = groupingExpressions.mkString("[", ", ", "]")
36+
val functionString = aggregateExpressions.mkString("[", ", ", "]")
37+
val aggregateAttributeString = aggregateAttributes.mkString("[", ", ", "]")
38+
val resultString = resultExpressions.mkString("[", ", ", "]")
39+
s"""
40+
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
41+
|Input: $inputString
42+
|Keys: $keyString
43+
|Functions: $functionString
44+
|Aggregate Attributes: $aggregateAttributeString
45+
|Results: $resultString
46+
""".stripMargin
47+
}
48+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ case class HashAggregateExec(
5353
initialInputBufferOffset: Int,
5454
resultExpressions: Seq[NamedExpression],
5555
child: SparkPlan)
56-
extends UnaryExecNode with BlockingOperatorWithCodegen with AliasAwareOutputPartitioning {
56+
extends BaseAggregateExec with BlockingOperatorWithCodegen with AliasAwareOutputPartitioning {
5757

5858
private[this] val aggregateBufferAttributes = {
5959
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ case class ObjectHashAggregateExec(
6767
initialInputBufferOffset: Int,
6868
resultExpressions: Seq[NamedExpression],
6969
child: SparkPlan)
70-
extends UnaryExecNode with AliasAwareOutputPartitioning {
70+
extends BaseAggregateExec with AliasAwareOutputPartitioning {
7171

7272
private[this] val aggregateBufferAttributes = {
7373
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.aggregate._
2525
import org.apache.spark.sql.catalyst.plans.physical._
2626
import org.apache.spark.sql.catalyst.util.truncatedString
27-
import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, SparkPlan, UnaryExecNode}
27+
import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, SparkPlan}
2828
import org.apache.spark.sql.execution.metric.SQLMetrics
2929

3030
/**
@@ -38,7 +38,7 @@ case class SortAggregateExec(
3838
initialInputBufferOffset: Int,
3939
resultExpressions: Seq[NamedExpression],
4040
child: SparkPlan)
41-
extends UnaryExecNode with AliasAwareOutputPartitioning {
41+
extends BaseAggregateExec with AliasAwareOutputPartitioning {
4242

4343
private[this] val aggregateBufferAttributes = {
4444
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

sql/core/src/test/resources/sql-tests/inputs/explain.sql

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
CREATE table explain_temp1 (key int, val int) USING PARQUET;
66
CREATE table explain_temp2 (key int, val int) USING PARQUET;
77
CREATE table explain_temp3 (key int, val int) USING PARQUET;
8+
CREATE table explain_temp4 (key int, val string) USING PARQUET;
89

910
SET spark.sql.codegen.wholeStage = true;
1011

@@ -61,7 +62,7 @@ EXPLAIN FORMATTED
6162
FROM explain_temp2
6263
WHERE val > 0)
6364
OR
64-
key = (SELECT max(key)
65+
key = (SELECT avg(key)
6566
FROM explain_temp3
6667
WHERE val > 0);
6768

@@ -93,6 +94,25 @@ EXPLAIN FORMATTED
9394
CREATE VIEW explain_view AS
9495
SELECT key, val FROM explain_temp1;
9596

97+
-- HashAggregate
98+
EXPLAIN FORMATTED
99+
SELECT
100+
COUNT(val) + SUM(key) as TOTAL,
101+
COUNT(key) FILTER (WHERE val > 1)
102+
FROM explain_temp1;
103+
104+
-- ObjectHashAggregate
105+
EXPLAIN FORMATTED
106+
SELECT key, sort_array(collect_set(val))[0]
107+
FROM explain_temp4
108+
GROUP BY key;
109+
110+
-- SortAggregate
111+
EXPLAIN FORMATTED
112+
SELECT key, MIN(val)
113+
FROM explain_temp4
114+
GROUP BY key;
115+
96116
-- cleanup
97117
DROP TABLE explain_temp1;
98118
DROP TABLE explain_temp2;

0 commit comments

Comments
 (0)