Skip to content

Commit 3d191c2

Browse files
committed
Encapsulate ProjectionMapping as a struct
1 parent a5c87e0 commit 3d191c2

File tree

4 files changed

+61
-48
lines changed

4 files changed

+61
-48
lines changed

datafusion/physical-expr/src/equivalence.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,55 @@ use indexmap::IndexMap;
4242
pub type EquivalenceClass = Vec<Arc<dyn PhysicalExpr>>;
4343

4444
/// Stores the mapping between source expressions and target expressions for a
45-
/// projection. Indices in the vector corresponds to the indices after projection.
46-
pub type ProjectionMapping = Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>;
45+
/// projection.
46+
#[derive(Debug, Clone)]
47+
pub struct ProjectionMapping {
48+
/// `(source expression)` --> `(target expression)`
49+
/// Indices in the vector corresponds to the indices after projection.
50+
inner: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,
51+
}
52+
53+
impl ProjectionMapping {
54+
/// Constructs the mapping between a projection's input and output
55+
pub fn try_new(
56+
expr: &[(Arc<dyn PhysicalExpr>, String)],
57+
input_schema: &SchemaRef,
58+
) -> Result<Self> {
59+
// Construct a map from the input expressions to the output expression of the projection:
60+
let mut inner = vec![];
61+
for (expr_idx, (expression, name)) in expr.iter().enumerate() {
62+
let target_expr = Arc::new(Column::new(name, expr_idx)) as _;
63+
64+
let source_expr = expression.clone().transform_down(&|e| match e
65+
.as_any()
66+
.downcast_ref::<Column>(
67+
) {
68+
Some(col) => {
69+
// Sometimes, expression and its name in the input_schema doesn't match.
70+
// This can cause problems. Hence in here we make sure that expression name
71+
// matches with the name in the inout_schema.
72+
// Conceptually, source_expr and expression should be same.
73+
let idx = col.index();
74+
let matching_input_field = input_schema.field(idx);
75+
let matching_input_column =
76+
Column::new(matching_input_field.name(), idx);
77+
Ok(Transformed::Yes(Arc::new(matching_input_column)))
78+
}
79+
None => Ok(Transformed::No(e)),
80+
})?;
81+
82+
inner.push((source_expr, target_expr));
83+
}
84+
Ok(Self { inner })
85+
}
86+
87+
/// Iterate over pairs of (source, target) expressions
88+
pub fn iter(
89+
&self,
90+
) -> impl Iterator<Item = &(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> + '_ {
91+
self.inner.iter()
92+
}
93+
}
4794

4895
/// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each
4996
/// class represents a distinct equivalence class in a relation.
@@ -328,7 +375,7 @@ impl EquivalenceGroup {
328375
// TODO: Convert the algorithm below to a version that uses `HashMap`.
329376
// once `Arc<dyn PhysicalExpr>` can be stored in `HashMap`.
330377
let mut new_classes = vec![];
331-
for (source, target) in mapping {
378+
for (source, target) in mapping.iter() {
332379
if new_classes.is_empty() {
333380
new_classes.push((source, vec![target.clone()]));
334381
}
@@ -979,7 +1026,7 @@ impl EquivalenceProperties {
9791026
.iter()
9801027
.filter_map(|order| self.eq_group.project_ordering(projection_mapping, order))
9811028
.collect::<Vec<_>>();
982-
for (source, target) in projection_mapping {
1029+
for (source, target) in projection_mapping.iter() {
9831030
let expr_ordering = ExprOrdering::new(source.clone())
9841031
.transform_up(&|expr| update_ordering(expr, self))
9851032
.unwrap();

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::aggregates::{
2525
no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
2626
topk_stream::GroupedTopKAggregateStream,
2727
};
28-
use crate::common::calculate_projection_mapping;
28+
2929
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3030
use crate::windows::{
3131
get_ordered_partition_by_indices, get_window_mode, PartitionSearchMode,
@@ -60,6 +60,7 @@ mod topk;
6060
mod topk_stream;
6161

6262
pub use datafusion_expr::AggregateFunction;
63+
use datafusion_physical_expr::equivalence::ProjectionMapping;
6364
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
6465

6566
/// Hash aggregate modes
@@ -294,9 +295,8 @@ pub struct AggregateExec {
294295
/// expressions from protobuf for final aggregate.
295296
pub input_schema: SchemaRef,
296297
/// The mapping used to normalize expressions like Partitioning and
297-
/// PhysicalSortExpr. The key is the expression from the input schema
298-
/// and the value is the expression from the output schema.
299-
projection_mapping: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,
298+
/// PhysicalSortExpr that maps input to output
299+
projection_mapping: ProjectionMapping,
300300
/// Execution metrics
301301
metrics: ExecutionPlanMetricsSet,
302302
required_input_ordering: Option<LexRequirement>,
@@ -535,7 +535,7 @@ impl AggregateExec {
535535

536536
// construct a map from the input expression to the output expression of the Aggregation group by
537537
let projection_mapping =
538-
calculate_projection_mapping(&group_by.expr, &input.schema())?;
538+
ProjectionMapping::try_new(&group_by.expr, &input.schema())?;
539539

540540
let required_input_ordering =
541541
(!new_requirement.is_empty()).then_some(new_requirement);

datafusion/physical-plan/src/common.rs

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@ use arrow::datatypes::Schema;
3131
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
3232
use arrow::record_batch::RecordBatch;
3333
use datafusion_common::stats::Precision;
34-
use datafusion_common::tree_node::{Transformed, TreeNode};
34+
use datafusion_common::tree_node::TreeNode;
3535
use datafusion_common::{plan_err, DataFusionError, Result};
3636
use datafusion_execution::memory_pool::MemoryReservation;
37-
use datafusion_physical_expr::equivalence::ProjectionMapping;
3837
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
3938
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
4039

@@ -375,38 +374,6 @@ pub fn batch_byte_size(batch: &RecordBatch) -> usize {
375374
batch.get_array_memory_size()
376375
}
377376

378-
/// Constructs the mapping between a projection's input and output
379-
pub fn calculate_projection_mapping(
380-
expr: &[(Arc<dyn PhysicalExpr>, String)],
381-
input_schema: &Arc<Schema>,
382-
) -> Result<ProjectionMapping> {
383-
// Construct a map from the input expressions to the output expression of the projection:
384-
let mut projection_mapping = vec![];
385-
for (expr_idx, (expression, name)) in expr.iter().enumerate() {
386-
let target_expr = Arc::new(Column::new(name, expr_idx)) as _;
387-
388-
let source_expr = expression.clone().transform_down(&|e| match e
389-
.as_any()
390-
.downcast_ref::<Column>()
391-
{
392-
Some(col) => {
393-
// Sometimes, expression and its name in the input_schema doesn't match.
394-
// This can cause problems. Hence in here we make sure that expression name
395-
// matches with the name in the inout_schema.
396-
// Conceptually, source_expr and expression should be same.
397-
let idx = col.index();
398-
let matching_input_field = input_schema.field(idx);
399-
let matching_input_column = Column::new(matching_input_field.name(), idx);
400-
Ok(Transformed::Yes(Arc::new(matching_input_column)))
401-
}
402-
None => Ok(Transformed::No(e)),
403-
})?;
404-
405-
projection_mapping.push((source_expr, target_expr));
406-
}
407-
Ok(projection_mapping)
408-
}
409-
410377
#[cfg(test)]
411378
mod tests {
412379
use std::ops::Not;

datafusion/physical-plan/src/projection.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use std::task::{Context, Poll};
2929
use super::expressions::{Column, PhysicalSortExpr};
3030
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
3131
use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream, Statistics};
32-
use crate::common::calculate_projection_mapping;
3332
use crate::{
3433
ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
3534
};
@@ -42,6 +41,7 @@ use datafusion_execution::TaskContext;
4241
use datafusion_physical_expr::expressions::{Literal, UnKnownColumn};
4342
use datafusion_physical_expr::EquivalenceProperties;
4443

44+
use datafusion_physical_expr::equivalence::ProjectionMapping;
4545
use futures::stream::{Stream, StreamExt};
4646
use log::trace;
4747

@@ -57,9 +57,8 @@ pub struct ProjectionExec {
5757
/// The output ordering
5858
output_ordering: Option<Vec<PhysicalSortExpr>>,
5959
/// The mapping used to normalize expressions like Partitioning and
60-
/// PhysicalSortExpr. The key is the expression from the input schema
61-
/// and the value is the expression from the output schema.
62-
projection_mapping: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,
60+
/// PhysicalSortExpr that maps input to output
61+
projection_mapping: ProjectionMapping,
6362
/// Execution metrics
6463
metrics: ExecutionPlanMetricsSet,
6564
}
@@ -94,7 +93,7 @@ impl ProjectionExec {
9493
));
9594

9695
// construct a map from the input expressions to the output expression of the Projection
97-
let projection_mapping = calculate_projection_mapping(&expr, &input_schema)?;
96+
let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?;
9897

9998
let input_eqs = input.equivalence_properties();
10099
let project_eqs = input_eqs.project(&projection_mapping, schema.clone());

0 commit comments

Comments
 (0)