Skip to content

Commit 9e02646

Browse files
committed
Eliminate all redundant aggregations
Before the change, it was disallowed to have an aggregation without GROUP BY and without any aggregate functions. This prevented the optimizer from removing each redundant aggregation if all were redundant. The first one would always be retained. This commit removes the limitation, allowing for queries to be further optimized.
1 parent 53a201d commit 9e02646

File tree

9 files changed

+51
-85
lines changed

9 files changed

+51
-85
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2744,23 +2744,21 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> {
27442744

27452745
assert_snapshot!(
27462746
pretty_format_batches(&sql_results).unwrap(),
2747-
@r###"
2748-
+---------------+---------------------------------------------------------+
2749-
| plan_type | plan |
2750-
+---------------+---------------------------------------------------------+
2751-
| logical_plan | LeftSemi Join: |
2752-
| | TableScan: t1 projection=[a, b] |
2753-
| | SubqueryAlias: __correlated_sq_1 |
2754-
| | Projection: |
2755-
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |
2756-
| | TableScan: t2 projection=[] |
2757-
| physical_plan | NestedLoopJoinExec: join_type=RightSemi |
2758-
| | ProjectionExec: expr=[] |
2759-
| | PlaceholderRowExec |
2760-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2761-
| | |
2762-
+---------------+---------------------------------------------------------+
2763-
"###
2747+
@r"
2748+
+---------------+-----------------------------------------------------+
2749+
| plan_type | plan |
2750+
+---------------+-----------------------------------------------------+
2751+
| logical_plan | LeftSemi Join: |
2752+
| | TableScan: t1 projection=[a, b] |
2753+
| | SubqueryAlias: __correlated_sq_1 |
2754+
| | Aggregate: groupBy=[[]], aggr=[[]] |
2755+
| | TableScan: t2 projection=[] |
2756+
| physical_plan | NestedLoopJoinExec: join_type=RightSemi |
2757+
| | PlaceholderRowExec |
2758+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2759+
| | |
2760+
+---------------+-----------------------------------------------------+
2761+
"
27642762
);
27652763

27662764
let df_results = ctx
@@ -2783,23 +2781,21 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> {
27832781

27842782
assert_snapshot!(
27852783
pretty_format_batches(&df_results).unwrap(),
2786-
@r###"
2787-
+---------------+---------------------------------------------------------------------+
2788-
| plan_type | plan |
2789-
+---------------+---------------------------------------------------------------------+
2790-
| logical_plan | LeftSemi Join: |
2791-
| | TableScan: t1 projection=[a, b] |
2792-
| | SubqueryAlias: __correlated_sq_1 |
2793-
| | Projection: |
2794-
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] |
2795-
| | TableScan: t2 projection=[] |
2796-
| physical_plan | NestedLoopJoinExec: join_type=RightSemi |
2797-
| | ProjectionExec: expr=[] |
2798-
| | PlaceholderRowExec |
2799-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2800-
| | |
2801-
+---------------+---------------------------------------------------------------------+
2802-
"###
2784+
@r"
2785+
+---------------+-----------------------------------------------------+
2786+
| plan_type | plan |
2787+
+---------------+-----------------------------------------------------+
2788+
| logical_plan | LeftSemi Join: |
2789+
| | TableScan: t1 projection=[a, b] |
2790+
| | SubqueryAlias: __correlated_sq_1 |
2791+
| | Aggregate: groupBy=[[]], aggr=[[]] |
2792+
| | TableScan: t2 projection=[] |
2793+
| physical_plan | NestedLoopJoinExec: join_type=RightSemi |
2794+
| | PlaceholderRowExec |
2795+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2796+
| | |
2797+
+---------------+-----------------------------------------------------+
2798+
"
28032799
);
28042800

28052801
Ok(())

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3520,11 +3520,6 @@ impl Aggregate {
35203520
aggr_expr: Vec<Expr>,
35213521
schema: DFSchemaRef,
35223522
) -> Result<Self> {
3523-
if group_expr.is_empty() && aggr_expr.is_empty() {
3524-
return plan_err!(
3525-
"Aggregate requires at least one grouping or aggregate expression"
3526-
);
3527-
}
35283523
let group_expr_count = grouping_set_expr_count(&group_expr)?;
35293524
if schema.fields().len() != group_expr_count + aggr_expr.len() {
35303525
return plan_err!(

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -153,24 +153,7 @@ fn optimize_projections(
153153

154154
// Only use the absolutely necessary aggregate expressions required
155155
// by the parent:
156-
let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
157-
158-
// Aggregations always need at least one aggregate expression.
159-
// With a nested count, we don't require any column as input, but
160-
// still need to create a correct aggregate, which may be optimized
161-
// out later. As an example, consider the following query:
162-
//
163-
// SELECT count(*) FROM (SELECT count(*) FROM [...])
164-
//
165-
// which always returns 1.
166-
if new_aggr_expr.is_empty()
167-
&& new_group_bys.is_empty()
168-
&& !aggregate.aggr_expr.is_empty()
169-
{
170-
// take the old, first aggregate expression
171-
new_aggr_expr = aggregate.aggr_expr;
172-
new_aggr_expr.resize_with(1, || unreachable!());
173-
}
156+
let new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
174157

175158
let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
176159
let schema = aggregate.input.schema();
@@ -1146,9 +1129,8 @@ mod tests {
11461129
plan,
11471130
@r"
11481131
Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]
1149-
Projection:
1150-
Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]
1151-
TableScan: ?table? projection=[]
1132+
Aggregate: groupBy=[[]], aggr=[[]]
1133+
TableScan: ?table? projection=[]
11521134
"
11531135
)
11541136
}

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,14 +429,12 @@ logical_plan
429429
01)LeftSemi Join:
430430
02)--TableScan: t1 projection=[a]
431431
03)--SubqueryAlias: __correlated_sq_1
432-
04)----Projection:
433-
05)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
434-
06)--------TableScan: t2 projection=[]
432+
04)----Aggregate: groupBy=[[]], aggr=[[]]
433+
05)------TableScan: t2 projection=[]
435434
physical_plan
436435
01)NestedLoopJoinExec: join_type=LeftSemi
437436
02)--DataSourceExec: partitions=1, partition_sizes=[0]
438-
03)--ProjectionExec: expr=[]
439-
04)----PlaceholderRowExec
437+
03)--PlaceholderRowExec
440438

441439
statement ok
442440
drop table t1;

datafusion/sqllogictest/test_files/explain_tree.slt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,14 +1263,11 @@ physical_plan
12631263
04)│ join_type: LeftSemi │ │
12641264
05)└─────────────┬─────────────┘ │
12651265
06)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
1266-
07)│ DataSourceExec ││ ProjectionExec
1266+
07)│ DataSourceExec ││ PlaceholderRowExec
12671267
08)│ -------------------- ││ │
12681268
09)│ files: 1 ││ │
12691269
10)│ format: csv ││ │
1270-
11)└───────────────────────────┘└─────────────┬─────────────┘
1271-
12)-----------------------------┌─────────────┴─────────────┐
1272-
13)-----------------------------│ PlaceholderRowExec │
1273-
14)-----------------------------└───────────────────────────┘
1270+
11)└───────────────────────────┘└───────────────────────────┘
12741271

12751272
# Query with cross join.
12761273
query TT

datafusion/sqllogictest/test_files/expr/date_part.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1089,4 +1089,4 @@ SELECT EXTRACT("isodow" FROM to_timestamp('2020-09-08T12:00:00+00:00'))
10891089
query I
10901090
SELECT EXTRACT('isodow' FROM to_timestamp('2020-09-08T12:00:00+00:00'))
10911091
----
1092-
1
1092+
1

datafusion/sqllogictest/test_files/issue_17138.slt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,35 @@ CREATE TABLE tab1(col0 INTEGER, col1 INTEGER, col2 INTEGER)
44
statement ok
55
INSERT INTO tab1 VALUES(51,14,96)
66

7-
# TODO this query used to (accidentally) pass before https://github.com/apache/datafusion/pull/17088
8-
query error DataFusion error: Execution error: avg\(DISTINCT\) aggregations are not available
7+
query R
98
SELECT NULL * AVG(DISTINCT 4) + SUM(col1) AS col0 FROM tab1
9+
----
10+
NULL
1011

1112
query TT
1213
EXPLAIN SELECT NULL * AVG(DISTINCT 4) + SUM(col1) AS col0 FROM tab1
1314
----
1415
logical_plan
1516
01)Projection: Float64(NULL) AS col0
16-
02)--Aggregate: groupBy=[[]], aggr=[[avg(DISTINCT Float64(4)) AS avg(DISTINCT Int64(4))]]
17+
02)--Aggregate: groupBy=[[]], aggr=[[]]
1718
03)----TableScan: tab1 projection=[]
1819
physical_plan
1920
01)ProjectionExec: expr=[NULL as col0]
20-
02)--AggregateExec: mode=Single, gby=[], aggr=[avg(DISTINCT Int64(4))]
21-
03)----DataSourceExec: partitions=1, partition_sizes=[1]
21+
02)--PlaceholderRowExec
2222

23-
# TODO this query used to (accidentally) pass before https://github.com/apache/datafusion/pull/17088
2423
# Similar, with a few more arithmetic operations
25-
query error DataFusion error: Execution error: avg\(DISTINCT\) aggregations are not available
24+
query R
2625
SELECT + CAST ( NULL AS INTEGER ) * + + AVG ( DISTINCT 4 ) + - SUM ( ALL + col1 ) AS col0 FROM tab1
26+
----
27+
NULL
2728

2829
query TT
2930
EXPLAIN SELECT + CAST ( NULL AS INTEGER ) * + + AVG ( DISTINCT 4 ) + - SUM ( ALL + col1 ) AS col0 FROM tab1
3031
----
3132
logical_plan
3233
01)Projection: Float64(NULL) AS col0
33-
02)--Aggregate: groupBy=[[]], aggr=[[avg(DISTINCT Float64(4)) AS avg(DISTINCT Int64(4))]]
34+
02)--Aggregate: groupBy=[[]], aggr=[[]]
3435
03)----TableScan: tab1 projection=[]
3536
physical_plan
3637
01)ProjectionExec: expr=[NULL as col0]
37-
02)--AggregateExec: mode=Single, gby=[], aggr=[avg(DISTINCT Int64(4))]
38-
03)----DataSourceExec: partitions=1, partition_sizes=[1]
38+
02)--PlaceholderRowExec

datafusion/sqllogictest/test_files/spark/bitwise/getbit.slt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,3 @@ query I
7373
SELECT getbit(11, NULL);
7474
----
7575
NULL
76-

datafusion/sqllogictest/test_files/subquery.slt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,9 +1453,8 @@ logical_plan
14531453
01)LeftSemi Join:
14541454
02)--TableScan: t1 projection=[a]
14551455
03)--SubqueryAlias: __correlated_sq_1
1456-
04)----Projection:
1457-
05)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
1458-
06)--------TableScan: t2 projection=[]
1456+
04)----Aggregate: groupBy=[[]], aggr=[[]]
1457+
05)------TableScan: t2 projection=[]
14591458

14601459
statement count 0
14611460
drop table t1;

0 commit comments

Comments
 (0)