-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Encapsulate ProjectionMapping as a struct
#8033
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,8 +42,65 @@ use indexmap::IndexMap; | |
| pub type EquivalenceClass = Vec<Arc<dyn PhysicalExpr>>; | ||
|
|
||
| /// Stores the mapping between source expressions and target expressions for a | ||
| /// projection. Indices in the vector corresponds to the indices after projection. | ||
| pub type ProjectionMapping = Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>; | ||
| /// projection. | ||
| #[derive(Debug, Clone)] | ||
| pub struct ProjectionMapping { | ||
| /// `(source expression)` --> `(target expression)` | ||
| /// Indices in the vector corresponds to the indices after projection. | ||
| inner: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>, | ||
| } | ||
|
|
||
| impl ProjectionMapping { | ||
| /// Constructs the mapping between a projection's input and output | ||
| /// expressions. | ||
| /// | ||
| /// For example, given the input projection expressions (`a+b`, `c+d`) | ||
| /// and an output schema with two columns `"c+d"` and `"a+b"` | ||
| /// the projection mapping would be | ||
| /// ```text | ||
| /// [0]: (c+d, col("c+d")) | ||
| /// [1]: (a+b, col("a+b")) | ||
| /// ``` | ||
| /// where `col("c+d")` means the column named "c+d". | ||
| pub fn try_new( | ||
| expr: &[(Arc<dyn PhysicalExpr>, String)], | ||
| input_schema: &SchemaRef, | ||
| ) -> Result<Self> { | ||
| // Construct a map from the input expressions to the output expression of the projection: | ||
| let mut inner = vec![]; | ||
| for (expr_idx, (expression, name)) in expr.iter().enumerate() { | ||
| let target_expr = Arc::new(Column::new(name, expr_idx)) as _; | ||
|
|
||
| let source_expr = expression.clone().transform_down(&|e| match e | ||
| .as_any() | ||
| .downcast_ref::<Column>( | ||
| ) { | ||
| Some(col) => { | ||
| // Sometimes, expression and its name in the input_schema doesn't match. | ||
| // This can cause problems. Hence in here we make sure that expression name | ||
| // matches with the name in the inout_schema. | ||
| // Conceptually, source_expr and expression should be same. | ||
| let idx = col.index(); | ||
| let matching_input_field = input_schema.field(idx); | ||
| let matching_input_column = | ||
| Column::new(matching_input_field.name(), idx); | ||
| Ok(Transformed::Yes(Arc::new(matching_input_column))) | ||
| } | ||
| None => Ok(Transformed::No(e)), | ||
| })?; | ||
|
|
||
| inner.push((source_expr, target_expr)); | ||
| } | ||
| Ok(Self { inner }) | ||
| } | ||
|
|
||
| /// Iterate over pairs of (source, target) expressions | ||
| pub fn iter( | ||
| &self, | ||
| ) -> impl Iterator<Item = &(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> + '_ { | ||
| self.inner.iter() | ||
| } | ||
| } | ||
|
|
||
| /// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each | ||
| /// class represents a distinct equivalence class in a relation. | ||
|
|
@@ -329,7 +386,7 @@ impl EquivalenceGroup { | |
| // once `Arc<dyn PhysicalExpr>` can be stored in `HashMap`. | ||
| // See issue: https://github.com/apache/arrow-datafusion/issues/8027 | ||
| let mut new_classes = vec![]; | ||
| for (source, target) in mapping { | ||
| for (source, target) in mapping.iter() { | ||
| if new_classes.is_empty() { | ||
| new_classes.push((source, vec![target.clone()])); | ||
| } | ||
|
|
@@ -980,7 +1037,7 @@ impl EquivalenceProperties { | |
| .iter() | ||
| .filter_map(|order| self.eq_group.project_ordering(projection_mapping, order)) | ||
| .collect::<Vec<_>>(); | ||
| for (source, target) in projection_mapping { | ||
| for (source, target) in projection_mapping.iter() { | ||
| let expr_ordering = ExprOrdering::new(source.clone()) | ||
| .transform_up(&|expr| update_ordering(expr, self)) | ||
| .unwrap(); | ||
|
|
@@ -1185,6 +1242,7 @@ fn update_ordering( | |
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::ops::Not; | ||
| use std::sync::Arc; | ||
|
|
||
| use super::*; | ||
|
|
@@ -1437,12 +1495,14 @@ mod tests { | |
| let col_a2 = &col("a2", &out_schema)?; | ||
| let col_a3 = &col("a3", &out_schema)?; | ||
| let col_a4 = &col("a4", &out_schema)?; | ||
| let projection_mapping = vec![ | ||
| (col_a.clone(), col_a1.clone()), | ||
| (col_a.clone(), col_a2.clone()), | ||
| (col_a.clone(), col_a3.clone()), | ||
| (col_a.clone(), col_a4.clone()), | ||
| ]; | ||
| let projection_mapping = ProjectionMapping { | ||
| inner: vec![ | ||
| (col_a.clone(), col_a1.clone()), | ||
| (col_a.clone(), col_a2.clone()), | ||
| (col_a.clone(), col_a3.clone()), | ||
| (col_a.clone(), col_a4.clone()), | ||
| ], | ||
| }; | ||
| let out_properties = input_properties.project(&projection_mapping, out_schema); | ||
|
|
||
| // At the output a1=a2=a3=a4 | ||
|
|
@@ -2565,4 +2625,234 @@ mod tests { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was just moved to be with the |
||
| let sort_options = SortOptions::default(); | ||
| let sort_options_not = SortOptions::default().not(); | ||
|
|
||
| let schema = Schema::new(vec![ | ||
| Field::new("a", DataType::Int32, true), | ||
| Field::new("b", DataType::Int32, true), | ||
| ]); | ||
| let col_a = &col("a", &schema)?; | ||
| let col_b = &col("b", &schema)?; | ||
| let required_columns = [col_b.clone(), col_a.clone()]; | ||
| let mut eq_properties = EquivalenceProperties::new(Arc::new(schema)); | ||
| eq_properties.add_new_orderings([vec![ | ||
| PhysicalSortExpr { | ||
| expr: Arc::new(Column::new("b", 1)), | ||
| options: sort_options_not, | ||
| }, | ||
| PhysicalSortExpr { | ||
| expr: Arc::new(Column::new("a", 0)), | ||
| options: sort_options, | ||
| }, | ||
| ]]); | ||
| let (result, idxs) = eq_properties.find_longest_permutation(&required_columns); | ||
| assert_eq!(idxs, vec![0, 1]); | ||
| assert_eq!( | ||
| result, | ||
| vec![ | ||
| PhysicalSortExpr { | ||
| expr: col_b.clone(), | ||
| options: sort_options_not | ||
| }, | ||
| PhysicalSortExpr { | ||
| expr: col_a.clone(), | ||
| options: sort_options | ||
| } | ||
| ] | ||
| ); | ||
|
|
||
| let schema = Schema::new(vec![ | ||
| Field::new("a", DataType::Int32, true), | ||
| Field::new("b", DataType::Int32, true), | ||
| Field::new("c", DataType::Int32, true), | ||
| ]); | ||
| let col_a = &col("a", &schema)?; | ||
| let col_b = &col("b", &schema)?; | ||
| let required_columns = [col_b.clone(), col_a.clone()]; | ||
| let mut eq_properties = EquivalenceProperties::new(Arc::new(schema)); | ||
| eq_properties.add_new_orderings([ | ||
| vec![PhysicalSortExpr { | ||
| expr: Arc::new(Column::new("c", 2)), | ||
| options: sort_options, | ||
| }], | ||
| vec![ | ||
| PhysicalSortExpr { | ||
| expr: Arc::new(Column::new("b", 1)), | ||
| options: sort_options_not, | ||
| }, | ||
| PhysicalSortExpr { | ||
| expr: Arc::new(Column::new("a", 0)), | ||
| options: sort_options, | ||
| }, | ||
| ], | ||
| ]); | ||
| let (result, idxs) = eq_properties.find_longest_permutation(&required_columns); | ||
| assert_eq!(idxs, vec![0, 1]); | ||
| assert_eq!( | ||
| result, | ||
| vec![ | ||
| PhysicalSortExpr { | ||
| expr: col_b.clone(), | ||
| options: sort_options_not | ||
| }, | ||
| PhysicalSortExpr { | ||
| expr: col_a.clone(), | ||
| options: sort_options | ||
| } | ||
| ] | ||
| ); | ||
|
|
||
| let required_columns = [ | ||
| Arc::new(Column::new("b", 1)) as _, | ||
| Arc::new(Column::new("a", 0)) as _, | ||
| ]; | ||
| let schema = Schema::new(vec![ | ||
| Field::new("a", DataType::Int32, true), | ||
| Field::new("b", DataType::Int32, true), | ||
| Field::new("c", DataType::Int32, true), | ||
| ]); | ||
| let mut eq_properties = EquivalenceProperties::new(Arc::new(schema)); | ||
|
|
||
| // not satisfied orders | ||
| eq_properties.add_new_orderings([vec![ | ||
| PhysicalSortExpr { | ||
| expr: Arc::new(Column::new("b", 1)), | ||
| options: sort_options_not, | ||
| }, | ||
| PhysicalSortExpr { | ||
| expr: Arc::new(Column::new("c", 2)), | ||
| options: sort_options, | ||
| }, | ||
| PhysicalSortExpr { | ||
| expr: Arc::new(Column::new("a", 0)), | ||
| options: sort_options, | ||
| }, | ||
| ]]); | ||
| let (_, idxs) = eq_properties.find_longest_permutation(&required_columns); | ||
| assert_eq!(idxs, vec![0]); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_normalize_ordering_equivalence_classes() -> Result<()> { | ||
| let sort_options = SortOptions::default(); | ||
|
|
||
| let schema = Schema::new(vec![ | ||
| Field::new("a", DataType::Int32, true), | ||
| Field::new("b", DataType::Int32, true), | ||
| Field::new("c", DataType::Int32, true), | ||
| ]); | ||
| let col_a_expr = col("a", &schema)?; | ||
| let col_b_expr = col("b", &schema)?; | ||
| let col_c_expr = col("c", &schema)?; | ||
| let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone())); | ||
|
|
||
| eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr); | ||
| let others = vec![ | ||
| vec![PhysicalSortExpr { | ||
| expr: col_b_expr.clone(), | ||
| options: sort_options, | ||
| }], | ||
| vec![PhysicalSortExpr { | ||
| expr: col_c_expr.clone(), | ||
| options: sort_options, | ||
| }], | ||
| ]; | ||
| eq_properties.add_new_orderings(others); | ||
|
|
||
| let mut expected_eqs = EquivalenceProperties::new(Arc::new(schema)); | ||
| expected_eqs.add_new_orderings([ | ||
| vec![PhysicalSortExpr { | ||
| expr: col_b_expr.clone(), | ||
| options: sort_options, | ||
| }], | ||
| vec![PhysicalSortExpr { | ||
| expr: col_c_expr.clone(), | ||
| options: sort_options, | ||
| }], | ||
| ]); | ||
|
|
||
| let oeq_class = eq_properties.oeq_class().clone(); | ||
| let expected = expected_eqs.oeq_class(); | ||
| assert!(oeq_class.eq(expected)); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn project_empty_output_ordering() -> Result<()> { | ||
| let schema = Schema::new(vec![ | ||
| Field::new("a", DataType::Int32, true), | ||
| Field::new("b", DataType::Int32, true), | ||
| Field::new("c", DataType::Int32, true), | ||
| ]); | ||
| let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone())); | ||
| let ordering = vec![PhysicalSortExpr { | ||
| expr: Arc::new(Column::new("b", 1)), | ||
| options: SortOptions::default(), | ||
| }]; | ||
| eq_properties.add_new_orderings([ordering]); | ||
| let projection_mapping = ProjectionMapping { | ||
| inner: vec![ | ||
| ( | ||
| Arc::new(Column::new("b", 1)) as _, | ||
| Arc::new(Column::new("b_new", 0)) as _, | ||
| ), | ||
| ( | ||
| Arc::new(Column::new("a", 0)) as _, | ||
| Arc::new(Column::new("a_new", 1)) as _, | ||
| ), | ||
| ], | ||
| }; | ||
| let projection_schema = Arc::new(Schema::new(vec![ | ||
| Field::new("b_new", DataType::Int32, true), | ||
| Field::new("a_new", DataType::Int32, true), | ||
| ])); | ||
| let orderings = eq_properties | ||
| .project(&projection_mapping, projection_schema) | ||
| .oeq_class() | ||
| .output_ordering() | ||
| .unwrap_or_default(); | ||
|
|
||
| assert_eq!( | ||
| vec![PhysicalSortExpr { | ||
| expr: Arc::new(Column::new("b_new", 0)), | ||
| options: SortOptions::default(), | ||
| }], | ||
| orderings | ||
| ); | ||
|
|
||
| let schema = Schema::new(vec![ | ||
| Field::new("a", DataType::Int32, true), | ||
| Field::new("b", DataType::Int32, true), | ||
| Field::new("c", DataType::Int32, true), | ||
| ]); | ||
| let eq_properties = EquivalenceProperties::new(Arc::new(schema)); | ||
| let projection_mapping = ProjectionMapping { | ||
| inner: vec![ | ||
| ( | ||
| Arc::new(Column::new("c", 2)) as _, | ||
| Arc::new(Column::new("c_new", 0)) as _, | ||
| ), | ||
| ( | ||
| Arc::new(Column::new("b", 1)) as _, | ||
| Arc::new(Column::new("b_new", 1)) as _, | ||
| ), | ||
| ], | ||
| }; | ||
| let projection_schema = Arc::new(Schema::new(vec![ | ||
| Field::new("c_new", DataType::Int32, true), | ||
| Field::new("b_new", DataType::Int32, true), | ||
| ])); | ||
| let projected = eq_properties.project(&projection_mapping, projection_schema); | ||
| // After projection there is no ordering. | ||
| assert!(projected.oeq_class().output_ordering().is_none()); | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to add some additional details here in the comments to make it easier to understand. The logic is the same