Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/physical-expr-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ arrow = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr-common = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
46 changes: 45 additions & 1 deletion datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr_common::columnar_value::ColumnarValue;
use itertools::Itertools;
use indexmap::IndexSet;
use itertools::{izip, Itertools};

/// Represents Sort operation for a column in a RecordBatch
///
Expand Down Expand Up @@ -409,6 +410,22 @@ impl LexOrdering {
.map(PhysicalSortExpr::from)
.collect()
}

/// Collapse a `LexOrdering` into a new duplicate-free `LexOrdering` based on expression.
///
/// This function filters duplicate entries that have same physical
/// expression inside, ignoring [`SortOptions`]. For example:
///
/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
pub fn collapse(self) -> Self {
let mut output = LexOrdering::default();
for item in self {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
output
}
}

impl From<Vec<PhysicalSortExpr>> for LexOrdering {
Expand Down Expand Up @@ -540,6 +557,33 @@ impl LexRequirement {
.collect(),
)
}

/// Constructs a duplicate-free `LexOrderingReq` by filtering out
/// duplicate entries that have same physical expression inside.
///
/// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a
/// Some(ASC)]`.
///
/// It will also filter out entries that are ordered if the next entry is;
/// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to
/// `vec![a Some(ASC)]`.
pub fn collapse(self) -> Self {
let mut output = Vec::<PhysicalSortRequirement>::new();
let mut exprs = IndexSet::new();
let mut reqs = vec![];
for item in self {
let PhysicalSortRequirement { expr, options: req } = item;
// new insertion
if exprs.insert(expr) {
reqs.push(req);
}
}
debug_assert_eq!(reqs.len(), exprs.len());
for (expr, req) in izip!(exprs, reqs) {
output.push(PhysicalSortRequirement::new(expr, req));
}
LexRequirement::new(output)
}
}

impl From<LexOrdering> for LexRequirement {
Expand Down
7 changes: 4 additions & 3 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
use super::{add_offset_to_expr, ProjectionMapping};
use crate::{
expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr, PhysicalSortRequirement,
Expand Down Expand Up @@ -527,12 +527,13 @@ impl EquivalenceGroup {
&self,
sort_reqs: &LexRequirement,
) -> LexRequirement {
collapse_lex_req(LexRequirement::new(
LexRequirement::new(
sort_reqs
.iter()
.map(|sort_req| self.normalize_sort_requirement(sort_req.clone()))
.collect(),
))
)
.collapse()
}

/// Projects `expr` according to the given projection mapping.
Expand Down
15 changes: 6 additions & 9 deletions datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;

use crate::expressions::Column;
use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement};
use crate::{LexRequirement, PhysicalExpr};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};

Expand All @@ -41,14 +41,9 @@ pub use properties::{
/// It will also filter out entries that are ordered if the next entry is;
/// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to
/// `vec![a Some(ASC)]`.
#[deprecated(since = "45.0.0", note = "Use LexRequirement::collapse")]
pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in input {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item);
}
}
LexRequirement::new(output)
input.collapse()
}

/// Adds the `offset` value to `Column` indices inside `expr`. This function is
Expand Down Expand Up @@ -80,7 +75,9 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::{plan_datafusion_err, Result};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, PhysicalSortRequirement,
};

pub fn output_schema(
mapping: &ProjectionMapping,
Expand Down
22 changes: 7 additions & 15 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,13 @@ impl OrderingEquivalenceClass {
/// Returns the concatenation of all the orderings. This enables merge
/// operations to preserve all equivalent orderings simultaneously.
pub fn output_ordering(&self) -> Option<LexOrdering> {
let output_ordering = self.orderings.iter().flatten().cloned().collect();
let output_ordering = collapse_lex_ordering(output_ordering);
let output_ordering = self
.orderings
.iter()
.flatten()
.cloned()
.collect::<LexOrdering>()
.collapse();
(!output_ordering.is_empty()).then_some(output_ordering)
}

Expand Down Expand Up @@ -207,19 +212,6 @@ impl IntoIterator for OrderingEquivalenceClass {
}
}

/// This function constructs a duplicate-free `LexOrdering` by filtering out
/// duplicate entries that have same physical expression inside. For example,
/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering {
let mut output = LexOrdering::default();
for item in input.iter() {
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
output.push(item.clone());
}
}
output
}

/// Trims `orderings[idx]` if some suffix of it overlaps with a prefix of
/// `orderings[pre_idx]`. Returns `true` if there is any overlap, `false` otherwise.
fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) -> bool {
Expand Down
21 changes: 8 additions & 13 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ use std::slice::Iter;
use std::sync::Arc;
use std::{fmt, mem};

use super::ordering::collapse_lex_ordering;
use crate::equivalence::class::{const_exprs_contains, AcrossPartitions};
use crate::equivalence::{
collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass,
ProjectionMapping,
EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
};
use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
use crate::{
Expand Down Expand Up @@ -501,15 +499,12 @@ impl EquivalenceProperties {
);
let constants_normalized = self.eq_group.normalize_exprs(constant_exprs);
// Prune redundant sections in the requirement:
collapse_lex_req(
normalized_sort_reqs
.iter()
.filter(|&order| {
!physical_exprs_contains(&constants_normalized, &order.expr)
})
.cloned()
.collect(),
)
normalized_sort_reqs
.iter()
.filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr))
.cloned()
.collect::<LexRequirement>()
.collapse()
}

/// Checks whether the given ordering is satisfied by any of the existing
Expand Down Expand Up @@ -911,7 +906,7 @@ impl EquivalenceProperties {
// Simplify each ordering by removing redundant sections:
orderings
.chain(projected_orderings)
.map(collapse_lex_ordering)
.map(|lex_ordering| lex_ordering.collapse())
.collect()
}

Expand Down
9 changes: 4 additions & 5 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ use datafusion_execution::TaskContext;
use datafusion_expr::{Accumulator, Aggregate};
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{
equivalence::{collapse_lex_req, ProjectionMapping},
expressions::Column,
physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement,
PhysicalExpr, PhysicalSortRequirement,
equivalence::ProjectionMapping, expressions::Column, physical_exprs_contains,
EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr,
PhysicalSortRequirement,
};

use itertools::Itertools;
Expand Down Expand Up @@ -473,7 +472,7 @@ impl AggregateExec {
&mode,
)?;
new_requirement.inner.extend(req);
new_requirement = collapse_lex_req(new_requirement);
new_requirement = new_requirement.collapse();

// If our aggregation has grouping sets then our base grouping exprs will
// be expanded based on the flags in `group_by.groups` where for each
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion_expr::{
PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::equivalence::collapse_lex_req;
use datafusion_physical_expr::{
reverse_order_bys,
window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr},
Expand Down Expand Up @@ -469,8 +468,8 @@ pub fn get_window_mode(
{
let req = LexRequirement::new(
[partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(),
);
let req = collapse_lex_req(req);
)
.collapse();
if partition_by_eqs.ordering_satisfy_requirement(&req) {
// Window can be run with existing ordering
let mode = if indices.len() == partitionby_exprs.len() {
Expand Down
Loading