Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 300 additions & 10 deletions datafusion/physical-expr/src/equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,65 @@ use indexmap::IndexMap;
pub type EquivalenceClass = Vec<Arc<dyn PhysicalExpr>>;

/// Stores the mapping between source expressions and target expressions for a
/// projection. Indices in the vector corresponds to the indices after projection.
pub type ProjectionMapping = Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>;
/// projection.
#[derive(Debug, Clone)]
pub struct ProjectionMapping {
/// `(source expression)` --> `(target expression)`
/// Indices in the vector corresponds to the indices after projection.
inner: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,
}

impl ProjectionMapping {
/// Constructs the mapping between a projection's input and output
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to add some additional details here in the comments to make it easier to understand. The logic is the same

/// expressions.
///
/// For example, given the input projection expressions (`a+b`, `c+d`)
/// and an output schema with two columns `"c+d"` and `"a+b"`
/// the projection mapping would be
/// ```text
/// [0]: (c+d, col("c+d"))
/// [1]: (a+b, col("a+b"))
/// ```
/// where `col("c+d")` means the column named "c+d".
pub fn try_new(
expr: &[(Arc<dyn PhysicalExpr>, String)],
input_schema: &SchemaRef,
) -> Result<Self> {
// Construct a map from the input expressions to the output expression of the projection:
let mut inner = vec![];
for (expr_idx, (expression, name)) in expr.iter().enumerate() {
let target_expr = Arc::new(Column::new(name, expr_idx)) as _;

let source_expr = expression.clone().transform_down(&|e| match e
.as_any()
.downcast_ref::<Column>(
) {
Some(col) => {
// Sometimes, expression and its name in the input_schema doesn't match.
// This can cause problems. Hence in here we make sure that expression name
// matches with the name in the inout_schema.
// Conceptually, source_expr and expression should be same.
let idx = col.index();
let matching_input_field = input_schema.field(idx);
let matching_input_column =
Column::new(matching_input_field.name(), idx);
Ok(Transformed::Yes(Arc::new(matching_input_column)))
}
None => Ok(Transformed::No(e)),
})?;

inner.push((source_expr, target_expr));
}
Ok(Self { inner })
}

/// Iterate over pairs of (source, target) expressions
pub fn iter(
&self,
) -> impl Iterator<Item = &(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> + '_ {
self.inner.iter()
}
}

/// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each
/// class represents a distinct equivalence class in a relation.
Expand Down Expand Up @@ -329,7 +386,7 @@ impl EquivalenceGroup {
// once `Arc<dyn PhysicalExpr>` can be stored in `HashMap`.
// See issue: https://github.com/apache/arrow-datafusion/issues/8027
let mut new_classes = vec![];
for (source, target) in mapping {
for (source, target) in mapping.iter() {
if new_classes.is_empty() {
new_classes.push((source, vec![target.clone()]));
}
Expand Down Expand Up @@ -980,7 +1037,7 @@ impl EquivalenceProperties {
.iter()
.filter_map(|order| self.eq_group.project_ordering(projection_mapping, order))
.collect::<Vec<_>>();
for (source, target) in projection_mapping {
for (source, target) in projection_mapping.iter() {
let expr_ordering = ExprOrdering::new(source.clone())
.transform_up(&|expr| update_ordering(expr, self))
.unwrap();
Expand Down Expand Up @@ -1185,6 +1242,7 @@ fn update_ordering(

#[cfg(test)]
mod tests {
use std::ops::Not;
use std::sync::Arc;

use super::*;
Expand Down Expand Up @@ -1437,12 +1495,14 @@ mod tests {
let col_a2 = &col("a2", &out_schema)?;
let col_a3 = &col("a3", &out_schema)?;
let col_a4 = &col("a4", &out_schema)?;
let projection_mapping = vec![
(col_a.clone(), col_a1.clone()),
(col_a.clone(), col_a2.clone()),
(col_a.clone(), col_a3.clone()),
(col_a.clone(), col_a4.clone()),
];
let projection_mapping = ProjectionMapping {
inner: vec![
(col_a.clone(), col_a1.clone()),
(col_a.clone(), col_a2.clone()),
(col_a.clone(), col_a3.clone()),
(col_a.clone(), col_a4.clone()),
],
};
let out_properties = input_properties.project(&projection_mapping, out_schema);

// At the output a1=a2=a3=a4
Expand Down Expand Up @@ -2565,4 +2625,234 @@ mod tests {

Ok(())
}

#[test]
fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just moved to be with the ProjectionMapping struct

let sort_options = SortOptions::default();
let sort_options_not = SortOptions::default().not();

let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let required_columns = [col_b.clone(), col_a.clone()];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
]]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
vec![
PhysicalSortExpr {
expr: col_b.clone(),
options: sort_options_not
},
PhysicalSortExpr {
expr: col_a.clone(),
options: sort_options
}
]
);

let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let required_columns = [col_b.clone(), col_a.clone()];
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
eq_properties.add_new_orderings([
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options: sort_options,
}],
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
],
]);
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0, 1]);
assert_eq!(
result,
vec![
PhysicalSortExpr {
expr: col_b.clone(),
options: sort_options_not
},
PhysicalSortExpr {
expr: col_a.clone(),
options: sort_options
}
]
);

let required_columns = [
Arc::new(Column::new("b", 1)) as _,
Arc::new(Column::new("a", 0)) as _,
];
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));

// not satisfied orders
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: sort_options_not,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options: sort_options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: sort_options,
},
]]);
let (_, idxs) = eq_properties.find_longest_permutation(&required_columns);
assert_eq!(idxs, vec![0]);

Ok(())
}

#[test]
fn test_normalize_ordering_equivalence_classes() -> Result<()> {
let sort_options = SortOptions::default();

let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let col_a_expr = col("a", &schema)?;
let col_b_expr = col("b", &schema)?;
let col_c_expr = col("c", &schema)?;
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));

eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr);
let others = vec![
vec![PhysicalSortExpr {
expr: col_b_expr.clone(),
options: sort_options,
}],
vec![PhysicalSortExpr {
expr: col_c_expr.clone(),
options: sort_options,
}],
];
eq_properties.add_new_orderings(others);

let mut expected_eqs = EquivalenceProperties::new(Arc::new(schema));
expected_eqs.add_new_orderings([
vec![PhysicalSortExpr {
expr: col_b_expr.clone(),
options: sort_options,
}],
vec![PhysicalSortExpr {
expr: col_c_expr.clone(),
options: sort_options,
}],
]);

let oeq_class = eq_properties.oeq_class().clone();
let expected = expected_eqs.oeq_class();
assert!(oeq_class.eq(expected));

Ok(())
}

#[test]
fn project_empty_output_ordering() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
let ordering = vec![PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: SortOptions::default(),
}];
eq_properties.add_new_orderings([ordering]);
let projection_mapping = ProjectionMapping {
inner: vec![
(
Arc::new(Column::new("b", 1)) as _,
Arc::new(Column::new("b_new", 0)) as _,
),
(
Arc::new(Column::new("a", 0)) as _,
Arc::new(Column::new("a_new", 1)) as _,
),
],
};
let projection_schema = Arc::new(Schema::new(vec![
Field::new("b_new", DataType::Int32, true),
Field::new("a_new", DataType::Int32, true),
]));
let orderings = eq_properties
.project(&projection_mapping, projection_schema)
.oeq_class()
.output_ordering()
.unwrap_or_default();

assert_eq!(
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("b_new", 0)),
options: SortOptions::default(),
}],
orderings
);

let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let eq_properties = EquivalenceProperties::new(Arc::new(schema));
let projection_mapping = ProjectionMapping {
inner: vec![
(
Arc::new(Column::new("c", 2)) as _,
Arc::new(Column::new("c_new", 0)) as _,
),
(
Arc::new(Column::new("b", 1)) as _,
Arc::new(Column::new("b_new", 1)) as _,
),
],
};
let projection_schema = Arc::new(Schema::new(vec![
Field::new("c_new", DataType::Int32, true),
Field::new("b_new", DataType::Int32, true),
]));
let projected = eq_properties.project(&projection_mapping, projection_schema);
// After projection there is no ordering.
assert!(projected.oeq_class().output_ordering().is_none());

Ok(())
}
}
Loading