Skip to content

Commit

Permalink
Remove order_bys from AggregateExec state (#8537)
Browse files Browse the repository at this point in the history
* Initial commit

* Remove order by from aggregate exec state
  • Loading branch information
mustafasrepo authored Dec 14, 2023
1 parent efa7b34 commit d67c0bb
Show file tree
Hide file tree
Showing 15 changed files with 9 additions and 119 deletions.
12 changes: 0 additions & 12 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -407,7 +406,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand All @@ -429,7 +427,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -439,7 +436,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand All @@ -460,7 +456,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -473,7 +468,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;
Expand All @@ -494,7 +488,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -507,7 +500,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;
Expand Down Expand Up @@ -539,7 +531,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
filter,
Arc::clone(&schema),
)?;
Expand All @@ -549,7 +540,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand Down Expand Up @@ -586,7 +576,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
filter,
Arc::clone(&schema),
)?;
Expand All @@ -596,7 +585,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
input_agg_exec.group_by().clone(),
input_agg_exec.aggr_expr().to_vec(),
input_agg_exec.filter_expr().to_vec(),
input_agg_exec.order_by_expr().to_vec(),
input_agg_exec.input().clone(),
input_agg_exec.input_schema(),
)
Expand Down Expand Up @@ -277,7 +276,6 @@ mod tests {
group_by,
aggr_expr,
vec![],
vec![],
input,
schema,
)
Expand All @@ -297,7 +295,6 @@ mod tests {
group_by,
aggr_expr,
vec![],
vec![],
input,
schema,
)
Expand Down Expand Up @@ -458,7 +455,6 @@ mod tests {
final_group_by,
aggr_expr,
vec![],
vec![],
partial_agg,
schema,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,6 @@ fn reorder_aggregate_keys(
new_partial_group_by,
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
agg_exec.order_by_expr().to_vec(),
agg_exec.input().clone(),
agg_exec.input_schema.clone(),
)?))
Expand All @@ -548,7 +547,6 @@ fn reorder_aggregate_keys(
new_group_by,
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
agg_exec.order_by_expr().to_vec(),
partial_agg,
agg_exec.input_schema(),
)?);
Expand Down Expand Up @@ -1909,14 +1907,12 @@ pub(crate) mod tests {
final_grouping,
vec![],
vec![],
vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by,
vec![],
vec![],
vec![],
input,
schema.clone(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl LimitedDistinctAggregation {
aggr.group_by().clone(),
aggr.aggr_expr().to_vec(),
aggr.filter_expr().to_vec(),
aggr.order_by_expr().to_vec(),
aggr.input().clone(),
aggr.input_schema(),
)
Expand Down Expand Up @@ -307,7 +306,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand All @@ -316,7 +314,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![None], /* order_by_expr */
Arc::new(partial_agg), /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -359,7 +356,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -401,7 +397,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -443,7 +438,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string(), "b".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand All @@ -452,7 +446,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![None], /* order_by_expr */
Arc::new(group_by_agg), /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -495,7 +488,6 @@ mod tests {
build_group_by(&schema.clone(), vec![]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -526,7 +518,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr()], /* aggr_expr */
vec![None], /* filter_expr */
vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -563,7 +554,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![filter_expr], /* filter_expr */
vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -592,22 +582,15 @@ mod tests {
let source = parquet_exec_with_sort(vec![sort_key]);
let schema = source.schema();

// `SELECT a FROM MemoryExec GROUP BY a ORDER BY a LIMIT 10;`, Single AggregateExec
let order_by_expr = Some(vec![PhysicalSortExpr {
expr: expressions::col("a", &schema.clone()).unwrap(),
options: SortOptions::default(),
}]);

// `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec
// the `a > 1` filter is applied in the AggregateExec
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![order_by_expr], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
vec![], /* aggr_expr */
vec![None], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ pub fn aggregate_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
PhysicalGroupBy::default(),
vec![],
vec![],
vec![],
input,
schema,
)
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/topk_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl TopKAggregation {
aggr.group_by().clone(),
aggr.aggr_expr().to_vec(),
aggr.filter_expr().to_vec(),
aggr.order_by_expr().to_vec(),
aggr.input().clone(),
aggr.input_schema(),
)
Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,14 +795,13 @@ impl DefaultPhysicalPlanner {
})
.collect::<Result<Vec<_>>>()?;

let (aggregates, filters, order_bys) : (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter);
let (aggregates, filters, _order_bys) : (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter);

let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
filters.clone(),
order_bys,
input_exec,
physical_input_schema.clone(),
)?);
Expand All @@ -820,7 +819,6 @@ impl DefaultPhysicalPlanner {
// To reflect such changes to subsequent stages, use the updated
// `AggregateExpr`/`PhysicalSortExpr` objects.
let updated_aggregates = initial_aggr.aggr_expr().to_vec();
let updated_order_bys = initial_aggr.order_by_expr().to_vec();

let next_partition_mode = if can_repartition {
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
Expand All @@ -844,7 +842,6 @@ impl DefaultPhysicalPlanner {
final_grouping_set,
updated_aggregates,
filters,
updated_order_bys,
initial_aggr,
physical_input_schema.clone(),
)?))
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
group_by.clone(),
aggregate_expr.clone(),
vec![None],
vec![None],
running_source,
schema.clone(),
)
Expand All @@ -122,7 +121,6 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
group_by.clone(),
aggregate_expr.clone(),
vec![None],
vec![None],
usual_source,
schema.clone(),
)
Expand Down
Loading

0 comments on commit d67c0bb

Please sign in to comment.