Skip to content

Commit 43f26a7

Browse files
adriangbdestrex271
authored andcommitted
Refactor DataSourceExec::try_swapping_with_projection to simplify and remove abstraction leakage (apache#17395)
1 parent b091c4a commit 43f26a7

File tree

6 files changed

+59
-71
lines changed

6 files changed

+59
-71
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,9 @@ use crate::file_groups::FileGroup;
2727
#[allow(unused_imports)]
2828
use crate::schema_adapter::SchemaAdapterFactory;
2929
use crate::{
30-
display::FileGroupsDisplay,
31-
file::FileSource,
32-
file_compression_type::FileCompressionType,
33-
file_stream::FileStream,
34-
source::{DataSource, DataSourceExec},
35-
statistics::MinMaxStatistics,
36-
PartitionedFile,
30+
display::FileGroupsDisplay, file::FileSource,
31+
file_compression_type::FileCompressionType, file_stream::FileStream,
32+
source::DataSource, statistics::MinMaxStatistics, PartitionedFile,
3733
};
3834
use arrow::datatypes::FieldRef;
3935
use arrow::{
@@ -57,11 +53,12 @@ use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
5753
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
5854
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
5955
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
56+
use datafusion_physical_plan::projection::ProjectionExpr;
6057
use datafusion_physical_plan::{
6158
display::{display_orderings, ProjectSchemaDisplay},
6259
metrics::ExecutionPlanMetricsSet,
63-
projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
64-
DisplayAs, DisplayFormatType, ExecutionPlan,
60+
projection::{all_alias_free_columns, new_projections_for_columns},
61+
DisplayAs, DisplayFormatType,
6562
};
6663
use datafusion_physical_plan::{
6764
filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation,
@@ -139,6 +136,9 @@ use log::{debug, warn};
139136
/// // create an execution plan from the config
140137
/// let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
141138
/// ```
139+
///
140+
/// [`DataSourceExec`]: crate::source::DataSourceExec
141+
/// [`DataSourceExec::from_data_source`]: crate::source::DataSourceExec::from_data_source
142142
#[derive(Clone)]
143143
pub struct FileScanConfig {
144144
/// Object store URL, used to get an [`ObjectStore`] instance from
@@ -159,6 +159,8 @@ pub struct FileScanConfig {
159159
/// Note that this is **not** the schema of the physical files.
160160
/// This is the schema that the physical file schema will be
161161
/// mapped onto, and the schema that the [`DataSourceExec`] will return.
162+
///
163+
/// [`DataSourceExec`]: crate::source::DataSourceExec
162164
pub file_schema: SchemaRef,
163165
/// List of files to be processed, grouped into partitions
164166
///
@@ -258,9 +260,10 @@ pub struct FileScanConfigBuilder {
258260
/// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns.
259261
///
260262
/// This probably would be better named `table_schema`
263+
///
264+
/// [`DataSourceExec`]: crate::source::DataSourceExec
261265
file_schema: SchemaRef,
262266
file_source: Arc<dyn FileSource>,
263-
264267
limit: Option<usize>,
265268
projection: Option<Vec<usize>>,
266269
table_partition_cols: Vec<FieldRef>,
@@ -634,20 +637,20 @@ impl DataSource for FileScanConfig {
634637

635638
fn try_swapping_with_projection(
636639
&self,
637-
projection: &ProjectionExec,
638-
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
640+
projection: &[ProjectionExpr],
641+
) -> Result<Option<Arc<dyn DataSource>>> {
639642
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.
640643

641644
// Must be all column references, with no table partition columns (which can not be projected)
642-
let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| {
645+
let partitioned_columns_in_proj = projection.iter().any(|(expr, _)| {
643646
expr.as_any()
644647
.downcast_ref::<Column>()
645648
.map(|expr| expr.index() >= self.file_schema.fields().len())
646649
.unwrap_or(false)
647650
});
648651

649652
// If there is any non-column or alias-carrier expression, Projection should not be removed.
650-
let no_aliases = all_alias_free_columns(projection.expr());
653+
let no_aliases = all_alias_free_columns(projection);
651654

652655
Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
653656
let file_scan = self.clone();
@@ -659,7 +662,8 @@ impl DataSource for FileScanConfig {
659662
.clone()
660663
.unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()),
661664
);
662-
DataSourceExec::from_data_source(
665+
666+
Arc::new(
663667
FileScanConfigBuilder::from(file_scan)
664668
// Assign projected statistics to source
665669
.with_projection(Some(new_projections))

datafusion/datasource/src/memory.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@ use datafusion_physical_expr::utils::collect_columns;
3737
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
3838
use datafusion_physical_plan::memory::MemoryStream;
3939
use datafusion_physical_plan::projection::{
40-
all_alias_free_columns, new_projections_for_columns, ProjectionExec,
40+
all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
4141
};
4242
use datafusion_physical_plan::{
43-
common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
44-
PhysicalExpr, SendableRecordBatchStream, Statistics,
43+
common, ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
44+
SendableRecordBatchStream, Statistics,
4545
};
4646

4747
use async_trait::async_trait;
@@ -213,24 +213,24 @@ impl DataSource for MemorySourceConfig {
213213

214214
fn try_swapping_with_projection(
215215
&self,
216-
projection: &ProjectionExec,
217-
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
216+
projection: &[ProjectionExpr],
217+
) -> Result<Option<Arc<dyn DataSource>>> {
218218
// If there is any non-column or alias-carrier expression, Projection should not be removed.
219219
// This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
220-
all_alias_free_columns(projection.expr())
220+
all_alias_free_columns(projection)
221221
.then(|| {
222222
let all_projections = (0..self.schema.fields().len()).collect();
223223
let new_projections = new_projections_for_columns(
224224
projection,
225225
self.projection().as_ref().unwrap_or(&all_projections),
226226
);
227227

228-
MemorySourceConfig::try_new_exec(
228+
MemorySourceConfig::try_new(
229229
self.partitions(),
230230
self.original_schema(),
231231
Some(new_projections),
232232
)
233-
.map(|e| e as _)
233+
.map(|s| Arc::new(s) as Arc<dyn DataSource>)
234234
})
235235
.transpose()
236236
}
@@ -835,6 +835,7 @@ mod tests {
835835
use datafusion_physical_expr::PhysicalSortExpr;
836836
use datafusion_physical_plan::expressions::lit;
837837

838+
use datafusion_physical_plan::ExecutionPlan;
838839
use futures::StreamExt;
839840

840841
#[tokio::test]

datafusion/datasource/src/source.rs

Lines changed: 9 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,12 @@ use std::fmt;
2222
use std::fmt::{Debug, Formatter};
2323
use std::sync::Arc;
2424

25-
use datafusion_physical_expr::equivalence::ProjectionMapping;
2625
use datafusion_physical_plan::execution_plan::{
2726
Boundedness, EmissionType, SchedulingType,
2827
};
2928
use datafusion_physical_plan::metrics::SplitMetrics;
3029
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
31-
use datafusion_physical_plan::projection::ProjectionExec;
30+
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
3231
use datafusion_physical_plan::stream::BatchSplitStream;
3332
use datafusion_physical_plan::{
3433
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
@@ -161,8 +160,8 @@ pub trait DataSource: Send + Sync + Debug {
161160
}
162161
fn try_swapping_with_projection(
163162
&self,
164-
_projection: &ProjectionExec,
165-
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
163+
_projection: &[ProjectionExpr],
164+
) -> Result<Option<Arc<dyn DataSource>>>;
166165
/// Try to push down filters into this DataSource.
167166
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
168167
///
@@ -318,36 +317,12 @@ impl ExecutionPlan for DataSourceExec {
318317
&self,
319318
projection: &ProjectionExec,
320319
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
321-
match self.data_source.try_swapping_with_projection(projection)? {
322-
Some(new_plan) => {
323-
if let Some(new_data_source_exec) =
324-
new_plan.as_any().downcast_ref::<DataSourceExec>()
325-
{
326-
let projection_mapping = ProjectionMapping::try_new(
327-
projection.expr().iter().cloned(),
328-
&self.schema(),
329-
)?;
330-
331-
// Project the equivalence properties to the new schema
332-
let projected_eq_properties = self
333-
.cache
334-
.eq_properties
335-
.project(&projection_mapping, new_data_source_exec.schema());
336-
337-
let preserved_exec = DataSourceExec {
338-
data_source: Arc::clone(&new_data_source_exec.data_source),
339-
cache: PlanProperties::new(
340-
projected_eq_properties,
341-
new_data_source_exec.cache.partitioning.clone(),
342-
new_data_source_exec.cache.emission_type,
343-
new_data_source_exec.cache.boundedness,
344-
)
345-
.with_scheduling_type(new_data_source_exec.cache.scheduling_type),
346-
};
347-
Ok(Some(Arc::new(preserved_exec)))
348-
} else {
349-
Ok(Some(new_plan))
350-
}
320+
match self
321+
.data_source
322+
.try_swapping_with_projection(projection.expr())?
323+
{
324+
Some(new_data_source) => {
325+
Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
351326
}
352327
None => Ok(None),
353328
}

datafusion/physical-plan/src/projection.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ use log::trace;
6161
#[derive(Debug, Clone)]
6262
pub struct ProjectionExec {
6363
/// The projection expressions stored as tuples of (expression, output column name)
64-
pub(crate) expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
64+
pub(crate) expr: Vec<ProjectionExpr>,
6565
/// The schema once the projection has been applied to the input
6666
schema: SchemaRef,
6767
/// The input plan
@@ -75,7 +75,7 @@ pub struct ProjectionExec {
7575
impl ProjectionExec {
7676
/// Create a projection on an input
7777
pub fn try_new(
78-
expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
78+
expr: Vec<ProjectionExpr>,
7979
input: Arc<dyn ExecutionPlan>,
8080
) -> Result<Self> {
8181
let input_schema = input.schema();
@@ -115,7 +115,7 @@ impl ProjectionExec {
115115
}
116116

117117
/// The projection expressions stored as tuples of (expression, output column name)
118-
pub fn expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
118+
pub fn expr(&self) -> &[ProjectionExpr] {
119119
&self.expr
120120
}
121121

@@ -147,6 +147,8 @@ impl ProjectionExec {
147147
}
148148
}
149149

150+
pub type ProjectionExpr = (Arc<dyn PhysicalExpr>, String);
151+
150152
impl DisplayAs for ProjectionExec {
151153
fn fmt_as(
152154
&self,
@@ -566,7 +568,7 @@ fn is_projection_removable(projection: &ProjectionExec) -> bool {
566568

567569
/// Given the expression set of a projection, checks if the projection causes
568570
/// any renaming or constructs a non-`Column` physical expression.
569-
pub fn all_alias_free_columns(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> bool {
571+
pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
570572
exprs.iter().all(|(expr, alias)| {
571573
expr.as_any()
572574
.downcast_ref::<Column>()
@@ -579,11 +581,10 @@ pub fn all_alias_free_columns(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> bool
579581
/// projection operator's expressions. To use this function safely, one must
580582
/// ensure that all expressions are `Column` expressions without aliases.
581583
pub fn new_projections_for_columns(
582-
projection: &ProjectionExec,
584+
projection: &[ProjectionExpr],
583585
source: &[usize],
584586
) -> Vec<usize> {
585587
projection
586-
.expr()
587588
.iter()
588589
.filter_map(|(expr, _)| {
589590
expr.as_any()
@@ -604,7 +605,7 @@ pub fn make_with_child(
604605
}
605606

606607
/// Returns `true` if all the expressions in the argument are `Column`s.
607-
pub fn all_columns(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> bool {
608+
pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
608609
exprs.iter().all(|(expr, _)| expr.as_any().is::<Column>())
609610
}
610611

@@ -627,7 +628,7 @@ pub fn all_columns(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> bool {
627628
/// `a@0`, but `b@2` results in `None` since the projection does not include `b`.
628629
pub fn update_expr(
629630
expr: &Arc<dyn PhysicalExpr>,
630-
projected_exprs: &[(Arc<dyn PhysicalExpr>, String)],
631+
projected_exprs: &[ProjectionExpr],
631632
sync_with_child: bool,
632633
) -> Result<Option<Arc<dyn PhysicalExpr>>> {
633634
#[derive(Debug, PartialEq)]
@@ -692,7 +693,7 @@ pub fn update_expr(
692693
/// expressions using the [`update_expr`] function.
693694
pub fn update_ordering(
694695
ordering: LexOrdering,
695-
projected_exprs: &[(Arc<dyn PhysicalExpr>, String)],
696+
projected_exprs: &[ProjectionExpr],
696697
) -> Result<Option<LexOrdering>> {
697698
let mut updated_exprs = vec![];
698699
for mut sort_expr in ordering.into_iter() {
@@ -710,7 +711,7 @@ pub fn update_ordering(
710711
/// expressions using the [`update_expr`] function.
711712
pub fn update_ordering_requirement(
712713
reqs: LexRequirement,
713-
projected_exprs: &[(Arc<dyn PhysicalExpr>, String)],
714+
projected_exprs: &[ProjectionExpr],
714715
) -> Result<Option<LexRequirement>> {
715716
let mut updated_exprs = vec![];
716717
for mut sort_expr in reqs.into_iter() {
@@ -727,7 +728,7 @@ pub fn update_ordering_requirement(
727728
/// Downcasts all the expressions in `exprs` to `Column`s. If any of the given
728729
/// expressions is not a `Column`, returns `None`.
729730
pub fn physical_to_column_exprs(
730-
exprs: &[(Arc<dyn PhysicalExpr>, String)],
731+
exprs: &[ProjectionExpr],
731732
) -> Option<Vec<(Column, String)>> {
732733
exprs
733734
.iter()
@@ -952,7 +953,7 @@ fn try_unifying_projections(
952953
}
953954

954955
/// Collect all column indices from the given projection expressions.
955-
fn collect_column_indices(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> Vec<usize> {
956+
fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
956957
// Collect indices and remove duplicates.
957958
let mut indices = exprs
958959
.iter()
@@ -1314,7 +1315,7 @@ mod tests {
13141315
// of output schema columns < input schema columns and hence if we use the last few columns
13151316
// from the input schema in the expressions here, bounds_check would fail on them if output
13161317
// schema is supplied to the partitions_statistics method.
1317-
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![
1318+
let exprs: Vec<ProjectionExpr> = vec![
13181319
(
13191320
Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
13201321
"c_renamed".to_string(),

datafusion/physical-plan/src/streaming.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ impl ExecutionPlan for StreamingTableExec {
299299
let streaming_table_projections =
300300
self.projection().as_ref().map(|i| i.as_ref().to_vec());
301301
let new_projections = new_projections_for_columns(
302-
projection,
302+
projection.expr(),
303303
&streaming_table_projections
304304
.unwrap_or_else(|| (0..self.schema().fields().len()).collect()),
305305
);

docs/source/library-user-guide/upgrading.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,13 @@ impl LazyBatchGenerator for MyBatchGenerator {
218218

219219
See [#17200](https://github.com/apache/datafusion/pull/17200) for details.
220220

221+
### Refactored `DataSource::try_swapping_with_projection`
222+
223+
We refactored `DataSource::try_swapping_with_projection` to simplify the method and minimize leakage across the ExecutionPlan <-> DataSource abstraction layer.
224+
Reimplementation for any custom `DataSource` should be relatively straightforward, see [#17395] for more details.
225+
226+
[#17395]: https://github.com/apache/datafusion/pull/17395/
227+
221228
## DataFusion `49.0.0`
222229

223230
### `MSRV` updated to 1.85.1

0 commit comments

Comments
 (0)