From a2eca291ad9d586222f042ab4c068feeb055526b Mon Sep 17 00:00:00 2001 From: ClSlaid Date: Tue, 14 May 2024 00:03:25 +0800 Subject: [PATCH] Stop copying LogicalPlan and Exprs in `ReplaceDistinctWithAggregate` (#10460) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * patch: implement rewrite for RDWA Signed-off-by: cailue * refactor: rewrite replace_distinct_aggregate Signed-off-by: 蔡略 * patch: recorrect aggr_expr Signed-off-by: 蔡略 * Update datafusion/optimizer/src/replace_distinct_aggregate.rs --------- Signed-off-by: cailue Signed-off-by: 蔡略 Co-authored-by: Andrew Lamb --- .../src/replace_distinct_aggregate.rs | 73 +++++++++++++------ 1 file changed, 49 insertions(+), 24 deletions(-) diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs index 4f68e2623f40..404f054cb9fa 100644 --- a/datafusion/optimizer/src/replace_distinct_aggregate.rs +++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs @@ -19,7 +19,9 @@ use crate::optimizer::{ApplyOrder, ApplyOrder::BottomUp}; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::{Column, Result}; +use datafusion_common::tree_node::Transformed; +use datafusion_common::{internal_err, Column, Result}; +use datafusion_expr::expr_rewriter::normalize_cols; use datafusion_expr::utils::expand_wildcard; use datafusion_expr::{ aggregate_function::AggregateFunction as AggregateFunctionFunc, col, @@ -66,20 +68,24 @@ impl ReplaceDistinctWithAggregate { } impl OptimizerRule for ReplaceDistinctWithAggregate { - fn try_optimize( + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( &self, - plan: &LogicalPlan, + plan: LogicalPlan, _config: &dyn OptimizerConfig, - ) -> Result> { + ) -> Result> { match plan { LogicalPlan::Distinct(Distinct::All(input)) => { - let group_expr = expand_wildcard(input.schema(), input, None)?; - let aggregate = LogicalPlan::Aggregate(Aggregate::try_new( - input.clone(), + let group_expr = expand_wildcard(input.schema(), &input, None)?; + let aggr_plan = LogicalPlan::Aggregate(Aggregate::try_new( + input, group_expr, vec![], )?); - Ok(Some(aggregate)) + Ok(Transformed::yes(aggr_plan)) } LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, @@ -88,13 +94,15 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { input, schema, })) => { + let expr_cnt = on_expr.len(); + // Construct the aggregation expression to be used to fetch the selected expressions. let aggr_expr = select_expr - .iter() + .into_iter() .map(|e| { Expr::AggregateFunction(AggregateFunction::new( AggregateFunctionFunc::FirstValue, - vec![e.clone()], + vec![e], false, None, sort_expr.clone(), @@ -103,45 +111,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { }) .collect::>(); + let aggr_expr = normalize_cols(aggr_expr, input.as_ref())?; + let group_expr = normalize_cols(on_expr, input.as_ref())?; + // Build the aggregation plan - let plan = LogicalPlanBuilder::from(input.as_ref().clone()) - .aggregate(on_expr.clone(), aggr_expr.to_vec())? - .build()?; + let plan = LogicalPlan::Aggregate(Aggregate::try_new( + input, group_expr, aggr_expr, + )?); + // TODO use LogicalPlanBuilder directly rather than recreating the Aggregate + // when https://github.com/apache/datafusion/issues/10485 is available + let lpb = LogicalPlanBuilder::from(plan); - let plan = if let Some(sort_expr) = sort_expr { + let plan = if let Some(mut sort_expr) = sort_expr { // While sort expressions were used in the `FIRST_VALUE` aggregation itself above, // this on it's own isn't enough to guarantee the proper output order of the grouping // (`ON`) expression, so we need to sort those as well. - LogicalPlanBuilder::from(plan) - .sort(sort_expr[..on_expr.len()].to_vec())? - .build()? + + // truncate the sort_expr to the length of on_expr + sort_expr.truncate(expr_cnt); + + lpb.sort(sort_expr)?.build()? } else { - plan + lpb.build()? }; // Whereas the aggregation plan by default outputs both the grouping and the aggregation // expressions, for `DISTINCT ON` we only need to emit the original selection expressions. + let project_exprs = plan .schema() .iter() - .skip(on_expr.len()) + .skip(expr_cnt) .zip(schema.iter()) .map(|((new_qualifier, new_field), (old_qualifier, old_field))| { - Ok(col(Column::from((new_qualifier, new_field))) - .alias_qualified(old_qualifier.cloned(), old_field.name())) + col(Column::from((new_qualifier, new_field))) + .alias_qualified(old_qualifier.cloned(), old_field.name()) }) - .collect::>>()?; + .collect::>(); let plan = LogicalPlanBuilder::from(plan) .project(project_exprs)? .build()?; - Ok(Some(plan)) + Ok(Transformed::yes(plan)) } - _ => Ok(None), + _ => Ok(Transformed::no(plan)), } } + fn try_optimize( + &self, + _plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + internal_err!("Should have called ReplaceDistinctWithAggregate::rewrite") + } + fn name(&self) -> &str { "replace_distinct_aggregate" }