Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Minor]: Produce better plan when group by contains all of the ordering requirements #7542

Merged
merged 3 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_expr::Accumulator;
use datafusion_physical_expr::{
equivalence::project_equivalence_properties,
expressions::Column,
normalize_out_expr_with_columns_map, reverse_order_bys,
normalize_out_expr_with_columns_map, physical_exprs_contains, reverse_order_bys,
utils::{convert_to_expr, get_indices_of_matching_exprs},
AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties,
PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
Expand Down Expand Up @@ -567,6 +567,27 @@ fn calc_required_input_ordering(
Ok((!required_input_ordering.is_empty()).then_some(required_input_ordering))
}

/// Check whether group by expression contains all of the expression inside `requirement`
// As an example Group By (c,b,a) contains all of the expressions in the `requirement`: (a ASC, b DESC)
fn group_by_contains_all_requirements(
group_by: &PhysicalGroupBy,
requirement: &LexOrdering,
) -> bool {
let physical_exprs = group_by
.expr()
.iter()
.map(|(expr, _alias)| expr.clone())
.collect::<Vec<_>>();
// When we have multiple groups (grouping set)
// since group by may be calculated on the subset of the group_by.expr()
// it is not guaranteed to have all of the requirements among group by expressions.
// Hence do the analysis: whether group by contains all requirements in the single group case.
group_by.groups.len() <= 1
&& requirement
.iter()
.all(|req| physical_exprs_contains(&physical_exprs, &req.expr))
}

impl AggregateExec {
/// Create a new hash aggregate execution plan
pub fn try_new(
Expand All @@ -593,16 +614,22 @@ impl AggregateExec {
.iter()
.zip(order_by_expr)
.map(|(aggr_expr, fn_reqs)| {
// If the aggregation function is order-sensitive and we are
// performing a "first stage" calculation, keep the ordering
// requirement as is; otherwise ignore the ordering requirement.
// If
// - aggregation function is order-sensitive and
// - aggregation is performing a "first stage" calculation, and
// - at least one of the aggregate function requirement is not inside group by expression
// keep the ordering requirement as is; otherwise ignore the ordering requirement.
// In non-first stage modes, we accumulate data (using `merge_batch`)
// from different partitions (i.e. merge partial results). During
// this merge, we consider the ordering of each partial result.
// Hence, we do not need to use the ordering requirement in such
// modes as long as partial results are generated with the
// correct ordering.
fn_reqs.filter(|_| is_order_sensitive(aggr_expr) && mode.is_first_stage())
fn_reqs.filter(|req| {
is_order_sensitive(aggr_expr)
&& mode.is_first_stage()
&& !group_by_contains_all_requirements(&group_by, req)
})
})
.collect::<Vec<_>>();
let mut aggregator_reverse_reqs = None;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use equivalence::{
};

pub use partitioning::{Distribution, Partitioning};
pub use physical_expr::{PhysicalExpr, PhysicalExprRef};
pub use physical_expr::{physical_exprs_contains, PhysicalExpr, PhysicalExprRef};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
pub use sort_expr::{
Expand Down
14 changes: 14 additions & 0 deletions datafusion/physical-expr/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,17 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
any
}
}

/// It is similar to contains method of vector.
/// Finds whether `expr` is among `physical_exprs`.
pub fn physical_exprs_contains(
physical_exprs: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
for physical_expr in physical_exprs {
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
if physical_expr.eq(expr) {
return true;
}
}
false
}
26 changes: 12 additions & 14 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2127,19 +2127,18 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
----TableScan: annotated_data_infinite2 projection=[a, b, c]
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
--AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
----AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true

query III
SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
FROM annotated_data_infinite2
GROUP BY a, b
----
0 0 24
0 1 49
1 2 74
1 3 99
0 0 0
0 1 25
1 2 50
1 3 75

# test_source_sorted_groupby4

Expand All @@ -2154,19 +2153,18 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(a
----TableScan: annotated_data_infinite2 projection=[a, b, c]
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as last_c]
--AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
----AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true

query III
SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
FROM annotated_data_infinite2
GROUP BY a, b
----
0 0 0
0 1 25
1 2 50
1 3 75
0 0 24
0 1 49
1 2 74
1 3 99

# when LAST_VALUE, or FIRST_VALUE value do not contain ordering requirement
# queries should still work, However, result depends on the scanning order and
Expand Down