Skip to content

Commit 3c41f4b

Browse files
Eric5553Nick Nicolini
authored andcommitted
[SPARK-30651][SQL] Add detailed information for Aggregate operators in EXPLAIN FORMATTED
Currently `EXPLAIN FORMATTED` only report input attributes of HashAggregate/ObjectHashAggregate/SortAggregate, while `EXPLAIN EXTENDED` provides more information of Keys, Functions, etc. This PR enhanced `EXPLAIN FORMATTED` to sync with original explain behavior. The newly added `EXPLAIN FORMATTED` got less information comparing to the original `EXPLAIN EXTENDED` Yes, taking HashAggregate explain result as example. **SQL** ``` EXPLAIN FORMATTED SELECT COUNT(val) + SUM(key) as TOTAL, COUNT(key) FILTER (WHERE val > 1) FROM explain_temp1; ``` **EXPLAIN EXTENDED** ``` == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(val#6), sum(cast(key#5 as bigint)), count(key#5)], output=[TOTAL#62L, count(key) FILTER (WHERE (val > 1))#71L]) +- Exchange SinglePartition, true, [id=#89] +- HashAggregate(keys=[], functions=[partial_count(val#6), partial_sum(cast(key#5 as bigint)), partial_count(key#5) FILTER (WHERE (val#6 > 1))], output=[count#75L, sum#76L, count#77L]) +- *(1) ColumnarToRow +- FileScan parquet default.explain_temp1[key#5,val#6] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/XXX/spark-dev/spark/spark-warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,val:int> ``` **EXPLAIN FORMATTED - BEFORE** ``` == Physical Plan == * HashAggregate (5) +- Exchange (4) +- HashAggregate (3) +- * ColumnarToRow (2) +- Scan parquet default.explain_temp1 (1) ... ... (5) HashAggregate [codegen id : 2] Input: [count#91L, sum#92L, count#93L] ... ... ``` **EXPLAIN FORMATTED - AFTER** ``` == Physical Plan == * HashAggregate (5) +- Exchange (4) +- HashAggregate (3) +- * ColumnarToRow (2) +- Scan parquet default.explain_temp1 (1) ... ... (5) HashAggregate [codegen id : 2] Input: [count#91L, sum#92L, count#93L] Keys: [] Functions: [count(val#6), sum(cast(key#5 as bigint)), count(key#5)] Results: [(count(val#6)#84L + sum(cast(key#5 as bigint))#85L) AS TOTAL#78L, count(key#5)#86L AS count(key) FILTER (WHERE (val > 1))#87L] Output: [TOTAL#78L, count(key) FILTER (WHERE (val > 1))#87L] ... ... ``` Three tests added in explain.sql for HashAggregate/ObjectHashAggregate/SortAggregate. Closes apache#27368 from Eric5553/ExplainFormattedAgg. Authored-by: Eric Wu <492960551@qq.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 1270813 commit 3c41f4b

File tree

4 files changed

+52
-4
lines changed

4 files changed

+52
-4
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.aggregate
19+
20+
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression}
21+
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
22+
import org.apache.spark.sql.execution.{ExplainUtils, UnaryExecNode}
23+
24+
/**
25+
* Holds common logic for aggregate operators
26+
*/
27+
trait BaseAggregateExec extends UnaryExecNode {
28+
def groupingExpressions: Seq[NamedExpression]
29+
def aggregateExpressions: Seq[AggregateExpression]
30+
def aggregateAttributes: Seq[Attribute]
31+
def resultExpressions: Seq[NamedExpression]
32+
33+
override def verboseStringWithOperatorId(): String = {
34+
val inputString = child.output.mkString("[", ", ", "]")
35+
val keyString = groupingExpressions.mkString("[", ", ", "]")
36+
val functionString = aggregateExpressions.mkString("[", ", ", "]")
37+
val aggregateAttributeString = aggregateAttributes.mkString("[", ", ", "]")
38+
val resultString = resultExpressions.mkString("[", ", ", "]")
39+
s"""
40+
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
41+
|Input: $inputString
42+
|Keys: $keyString
43+
|Functions: $functionString
44+
|Aggregate Attributes: $aggregateAttributeString
45+
|Results: $resultString
46+
""".stripMargin
47+
}
48+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ case class HashAggregateExec(
4747
initialInputBufferOffset: Int,
4848
resultExpressions: Seq[NamedExpression],
4949
child: SparkPlan)
50-
extends UnaryExecNode with BlockingOperatorWithCodegen {
50+
extends BaseAggregateExec with BlockingOperatorWithCodegen {
5151

5252
private[this] val aggregateBufferAttributes = {
5353
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ case class ObjectHashAggregateExec(
6565
initialInputBufferOffset: Int,
6666
resultExpressions: Seq[NamedExpression],
6767
child: SparkPlan)
68-
extends UnaryExecNode {
68+
extends BaseAggregateExec {
6969

7070
private[this] val aggregateBufferAttributes = {
7171
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.aggregate._
2525
import org.apache.spark.sql.catalyst.plans.physical._
2626
import org.apache.spark.sql.catalyst.util.truncatedString
27-
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
27+
import org.apache.spark.sql.execution.SparkPlan
2828
import org.apache.spark.sql.execution.metric.SQLMetrics
2929

3030
/**
@@ -38,7 +38,7 @@ case class SortAggregateExec(
3838
initialInputBufferOffset: Int,
3939
resultExpressions: Seq[NamedExpression],
4040
child: SparkPlan)
41-
extends UnaryExecNode {
41+
extends BaseAggregateExec {
4242

4343
private[this] val aggregateBufferAttributes = {
4444
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

0 commit comments

Comments
 (0)