Skip to content

Commit 452e009

Browse files
committed
Encapsulate ProjectionMapping as a struct
1 parent 3d191c2 commit 452e009

File tree

3 files changed

+249
-236
lines changed

3 files changed

+249
-236
lines changed

datafusion/physical-expr/src/equivalence.rs

Lines changed: 249 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,16 @@ pub struct ProjectionMapping {
5252

5353
impl ProjectionMapping {
5454
/// Constructs the mapping between a projection's input and output
55+
/// expressions.
56+
///
57+
/// For example, given the input projection expressions (`a+b`, `c+d`)
58+
/// and an output schema with two columns `"c+d"` and `"a+b"`
59+
/// the projection mapping would be
60+
/// ```text
61+
/// [0]: (c+d, col("c+d"))
62+
/// [1]: (a+b, col("a+b"))
63+
/// ```
64+
/// where `col("c+d")` means the column named "c+d".
5565
pub fn try_new(
5666
expr: &[(Arc<dyn PhysicalExpr>, String)],
5767
input_schema: &SchemaRef,
@@ -1231,6 +1241,7 @@ fn update_ordering(
12311241

12321242
#[cfg(test)]
12331243
mod tests {
1244+
use std::ops::Not;
12341245
use std::sync::Arc;
12351246

12361247
use super::*;
@@ -1483,12 +1494,14 @@ mod tests {
14831494
let col_a2 = &col("a2", &out_schema)?;
14841495
let col_a3 = &col("a3", &out_schema)?;
14851496
let col_a4 = &col("a4", &out_schema)?;
1486-
let projection_mapping = vec![
1487-
(col_a.clone(), col_a1.clone()),
1488-
(col_a.clone(), col_a2.clone()),
1489-
(col_a.clone(), col_a3.clone()),
1490-
(col_a.clone(), col_a4.clone()),
1491-
];
1497+
let projection_mapping = ProjectionMapping {
1498+
inner: vec![
1499+
(col_a.clone(), col_a1.clone()),
1500+
(col_a.clone(), col_a2.clone()),
1501+
(col_a.clone(), col_a3.clone()),
1502+
(col_a.clone(), col_a4.clone()),
1503+
],
1504+
};
14921505
let out_properties = input_properties.project(&projection_mapping, out_schema);
14931506

14941507
// At the output a1=a2=a3=a4
@@ -2611,4 +2624,234 @@ mod tests {
26112624

26122625
Ok(())
26132626
}
2627+
2628+
#[test]
2629+
fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
2630+
let sort_options = SortOptions::default();
2631+
let sort_options_not = SortOptions::default().not();
2632+
2633+
let schema = Schema::new(vec![
2634+
Field::new("a", DataType::Int32, true),
2635+
Field::new("b", DataType::Int32, true),
2636+
]);
2637+
let col_a = &col("a", &schema)?;
2638+
let col_b = &col("b", &schema)?;
2639+
let required_columns = [col_b.clone(), col_a.clone()];
2640+
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
2641+
eq_properties.add_new_orderings([vec![
2642+
PhysicalSortExpr {
2643+
expr: Arc::new(Column::new("b", 1)),
2644+
options: sort_options_not,
2645+
},
2646+
PhysicalSortExpr {
2647+
expr: Arc::new(Column::new("a", 0)),
2648+
options: sort_options,
2649+
},
2650+
]]);
2651+
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
2652+
assert_eq!(idxs, vec![0, 1]);
2653+
assert_eq!(
2654+
result,
2655+
vec![
2656+
PhysicalSortExpr {
2657+
expr: col_b.clone(),
2658+
options: sort_options_not
2659+
},
2660+
PhysicalSortExpr {
2661+
expr: col_a.clone(),
2662+
options: sort_options
2663+
}
2664+
]
2665+
);
2666+
2667+
let schema = Schema::new(vec![
2668+
Field::new("a", DataType::Int32, true),
2669+
Field::new("b", DataType::Int32, true),
2670+
Field::new("c", DataType::Int32, true),
2671+
]);
2672+
let col_a = &col("a", &schema)?;
2673+
let col_b = &col("b", &schema)?;
2674+
let required_columns = [col_b.clone(), col_a.clone()];
2675+
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
2676+
eq_properties.add_new_orderings([
2677+
vec![PhysicalSortExpr {
2678+
expr: Arc::new(Column::new("c", 2)),
2679+
options: sort_options,
2680+
}],
2681+
vec![
2682+
PhysicalSortExpr {
2683+
expr: Arc::new(Column::new("b", 1)),
2684+
options: sort_options_not,
2685+
},
2686+
PhysicalSortExpr {
2687+
expr: Arc::new(Column::new("a", 0)),
2688+
options: sort_options,
2689+
},
2690+
],
2691+
]);
2692+
let (result, idxs) = eq_properties.find_longest_permutation(&required_columns);
2693+
assert_eq!(idxs, vec![0, 1]);
2694+
assert_eq!(
2695+
result,
2696+
vec![
2697+
PhysicalSortExpr {
2698+
expr: col_b.clone(),
2699+
options: sort_options_not
2700+
},
2701+
PhysicalSortExpr {
2702+
expr: col_a.clone(),
2703+
options: sort_options
2704+
}
2705+
]
2706+
);
2707+
2708+
let required_columns = [
2709+
Arc::new(Column::new("b", 1)) as _,
2710+
Arc::new(Column::new("a", 0)) as _,
2711+
];
2712+
let schema = Schema::new(vec![
2713+
Field::new("a", DataType::Int32, true),
2714+
Field::new("b", DataType::Int32, true),
2715+
Field::new("c", DataType::Int32, true),
2716+
]);
2717+
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
2718+
2719+
// not satisfied orders
2720+
eq_properties.add_new_orderings([vec![
2721+
PhysicalSortExpr {
2722+
expr: Arc::new(Column::new("b", 1)),
2723+
options: sort_options_not,
2724+
},
2725+
PhysicalSortExpr {
2726+
expr: Arc::new(Column::new("c", 2)),
2727+
options: sort_options,
2728+
},
2729+
PhysicalSortExpr {
2730+
expr: Arc::new(Column::new("a", 0)),
2731+
options: sort_options,
2732+
},
2733+
]]);
2734+
let (_, idxs) = eq_properties.find_longest_permutation(&required_columns);
2735+
assert_eq!(idxs, vec![0]);
2736+
2737+
Ok(())
2738+
}
2739+
2740+
#[test]
2741+
fn test_normalize_ordering_equivalence_classes() -> Result<()> {
2742+
let sort_options = SortOptions::default();
2743+
2744+
let schema = Schema::new(vec![
2745+
Field::new("a", DataType::Int32, true),
2746+
Field::new("b", DataType::Int32, true),
2747+
Field::new("c", DataType::Int32, true),
2748+
]);
2749+
let col_a_expr = col("a", &schema)?;
2750+
let col_b_expr = col("b", &schema)?;
2751+
let col_c_expr = col("c", &schema)?;
2752+
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
2753+
2754+
eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr);
2755+
let others = vec![
2756+
vec![PhysicalSortExpr {
2757+
expr: col_b_expr.clone(),
2758+
options: sort_options,
2759+
}],
2760+
vec![PhysicalSortExpr {
2761+
expr: col_c_expr.clone(),
2762+
options: sort_options,
2763+
}],
2764+
];
2765+
eq_properties.add_new_orderings(others);
2766+
2767+
let mut expected_eqs = EquivalenceProperties::new(Arc::new(schema));
2768+
expected_eqs.add_new_orderings([
2769+
vec![PhysicalSortExpr {
2770+
expr: col_b_expr.clone(),
2771+
options: sort_options,
2772+
}],
2773+
vec![PhysicalSortExpr {
2774+
expr: col_c_expr.clone(),
2775+
options: sort_options,
2776+
}],
2777+
]);
2778+
2779+
let oeq_class = eq_properties.oeq_class().clone();
2780+
let expected = expected_eqs.oeq_class();
2781+
assert!(oeq_class.eq(expected));
2782+
2783+
Ok(())
2784+
}
2785+
2786+
#[test]
2787+
fn project_empty_output_ordering() -> Result<()> {
2788+
let schema = Schema::new(vec![
2789+
Field::new("a", DataType::Int32, true),
2790+
Field::new("b", DataType::Int32, true),
2791+
Field::new("c", DataType::Int32, true),
2792+
]);
2793+
let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
2794+
let ordering = vec![PhysicalSortExpr {
2795+
expr: Arc::new(Column::new("b", 1)),
2796+
options: SortOptions::default(),
2797+
}];
2798+
eq_properties.add_new_orderings([ordering]);
2799+
let projection_mapping = ProjectionMapping {
2800+
inner: vec![
2801+
(
2802+
Arc::new(Column::new("b", 1)) as _,
2803+
Arc::new(Column::new("b_new", 0)) as _,
2804+
),
2805+
(
2806+
Arc::new(Column::new("a", 0)) as _,
2807+
Arc::new(Column::new("a_new", 1)) as _,
2808+
),
2809+
],
2810+
};
2811+
let projection_schema = Arc::new(Schema::new(vec![
2812+
Field::new("b_new", DataType::Int32, true),
2813+
Field::new("a_new", DataType::Int32, true),
2814+
]));
2815+
let orderings = eq_properties
2816+
.project(&projection_mapping, projection_schema)
2817+
.oeq_class()
2818+
.output_ordering()
2819+
.unwrap_or_default();
2820+
2821+
assert_eq!(
2822+
vec![PhysicalSortExpr {
2823+
expr: Arc::new(Column::new("b_new", 0)),
2824+
options: SortOptions::default(),
2825+
}],
2826+
orderings
2827+
);
2828+
2829+
let schema = Schema::new(vec![
2830+
Field::new("a", DataType::Int32, true),
2831+
Field::new("b", DataType::Int32, true),
2832+
Field::new("c", DataType::Int32, true),
2833+
]);
2834+
let eq_properties = EquivalenceProperties::new(Arc::new(schema));
2835+
let projection_mapping = ProjectionMapping {
2836+
inner: vec![
2837+
(
2838+
Arc::new(Column::new("c", 2)) as _,
2839+
Arc::new(Column::new("c_new", 0)) as _,
2840+
),
2841+
(
2842+
Arc::new(Column::new("b", 1)) as _,
2843+
Arc::new(Column::new("b_new", 1)) as _,
2844+
),
2845+
],
2846+
};
2847+
let projection_schema = Arc::new(Schema::new(vec![
2848+
Field::new("c_new", DataType::Int32, true),
2849+
Field::new("b_new", DataType::Int32, true),
2850+
]));
2851+
let projected = eq_properties.project(&projection_mapping, projection_schema);
2852+
// After projection there is no ordering.
2853+
assert!(projected.oeq_class().output_ordering().is_none());
2854+
2855+
Ok(())
2856+
}
26142857
}

0 commit comments

Comments
 (0)