Skip to content

Commit 493036f

Browse files
committed
move projection handling into FileSource
1 parent cf03574 commit 493036f

File tree

31 files changed

+1438
-943
lines changed

31 files changed

+1438
-943
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/examples/custom_data_source/csv_json_opener.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,22 @@ async fn csv_opener() -> Result<()> {
6262
..Default::default()
6363
};
6464

65-
let scan_config = FileScanConfigBuilder::new(
66-
ObjectStoreUrl::local_filesystem(),
67-
Arc::new(CsvSource::new(Arc::clone(&schema)).with_csv_options(options.clone())),
68-
)
69-
.with_projection_indices(Some(vec![12, 0]))
70-
.with_limit(Some(5))
71-
.with_file(PartitionedFile::new(path.display().to_string(), 10))
72-
.build();
73-
74-
let config = CsvSource::new(Arc::clone(&schema))
65+
let source = CsvSource::new(Arc::clone(&schema))
7566
.with_csv_options(options)
7667
.with_comment(Some(b'#'))
77-
.with_batch_size(8192)
78-
.with_projection(&scan_config);
79-
80-
let opener = config.create_file_opener(object_store, &scan_config, 0);
68+
.with_batch_size(8192);
69+
70+
let scan_config =
71+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
72+
.with_projection_indices(Some(vec![12, 0]))
73+
.with_limit(Some(5))
74+
.with_file(PartitionedFile::new(path.display().to_string(), 10))
75+
.build();
76+
77+
let opener =
78+
scan_config
79+
.file_source()
80+
.create_file_opener(object_store, &scan_config, 0)?;
8181

8282
let mut result = vec![];
8383
let mut stream =

datafusion/core/src/dataframe/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ mod tests {
150150
let plan = df.explain(false, false)?.collect().await?;
151151
// Filters all the way to Parquet
152152
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
153-
assert!(formatted.contains("FilterExec: id@0 = 1"));
153+
assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}");
154154

155155
Ok(())
156156
}

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,14 @@ impl FileSource for TestSource {
140140
_object_store: Arc<dyn ObjectStore>,
141141
_base_config: &FileScanConfig,
142142
_partition: usize,
143-
) -> Arc<dyn FileOpener> {
144-
Arc::new(TestOpener {
143+
) -> Result<Arc<dyn FileOpener>> {
144+
Ok(Arc::new(TestOpener {
145145
batches: self.batches.clone(),
146146
batch_size: self.batch_size,
147147
schema: Arc::clone(&self.schema),
148148
projection: self.projection.clone(),
149149
predicate: self.predicate.clone(),
150-
})
150+
}))
151151
}
152152

153153
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
@@ -165,13 +165,6 @@ impl FileSource for TestSource {
165165
})
166166
}
167167

168-
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
169-
Arc::new(TestSource {
170-
projection: config.projection_exprs.as_ref().map(|p| p.column_indices()),
171-
..self.clone()
172-
})
173-
}
174-
175168
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
176169
Arc::new(TestSource {
177170
statistics: Some(statistics),

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ mod test {
620620
let plan_string = get_plan_string(&aggregate_exec_partial).swap_remove(0);
621621
assert_snapshot!(
622622
plan_string,
623-
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]"
623+
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)], ordering_mode=Sorted"
624624
);
625625

626626
let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -703,10 +703,7 @@ fn test_projection_after_projection() -> Result<()> {
703703

704704
assert_snapshot!(
705705
actual,
706-
@r"
707-
ProjectionExec: expr=[b@1 as new_b, c@2 + e@4 as binary, b@1 as newest_b]
708-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
709-
"
706+
@"DataSourceExec: file_groups={1 group: [[x]]}, projection=[new_b, binary, newest_b], file_type=csv, has_header=false"
710707
);
711708

712709
Ok(())
@@ -773,8 +770,7 @@ fn test_output_req_after_projection() -> Result<()> {
773770
actual,
774771
@r"
775772
OutputRequirementExec: order_by=[(b@2, asc), (c@0 + new_a@1, asc)], dist_by=HashPartitioned[[new_a@1, b@2]])
776-
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
777-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
773+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[c, new_a, b], file_type=csv, has_header=false
778774
"
779775
);
780776

@@ -864,8 +860,7 @@ fn test_coalesce_partitions_after_projection() -> Result<()> {
864860
actual,
865861
@r"
866862
CoalescePartitionsExec
867-
ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d]
868-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
863+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[b, a_new, d], file_type=csv, has_header=false
869864
"
870865
);
871866

@@ -922,8 +917,7 @@ fn test_filter_after_projection() -> Result<()> {
922917
actual,
923918
@r"
924919
FilterExec: b@1 - a_new@0 > d@2 - a_new@0
925-
ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d]
926-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
920+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a_new, b, d], file_type=csv, has_header=false
927921
"
928922
);
929923

@@ -1025,10 +1019,8 @@ fn test_join_after_projection() -> Result<()> {
10251019
actual,
10261020
@r"
10271021
SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b_from_left@1, c_from_right@1)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2
1028-
ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left]
1029-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1030-
ProjectionExec: expr=[a@0 as a_from_right, c@2 as c_from_right]
1031-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1022+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[c_from_left, b_from_left, a_from_left], file_type=csv, has_header=false
1023+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a_from_right, c_from_right], file_type=csv, has_header=false
10321024
"
10331025
);
10341026

@@ -1410,8 +1402,7 @@ fn test_repartition_after_projection() -> Result<()> {
14101402
actual,
14111403
@r"
14121404
RepartitionExec: partitioning=Hash([a@1, b_new@0, d_new@2], 6), input_partitions=1
1413-
ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]
1414-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1405+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[b_new, a, d_new], file_type=csv, has_header=false
14151406
"
14161407
);
14171408

@@ -1481,8 +1472,7 @@ fn test_sort_after_projection() -> Result<()> {
14811472
actual,
14821473
@r"
14831474
SortExec: expr=[b@2 ASC, c@0 + new_a@1 ASC], preserve_partitioning=[false]
1484-
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
1485-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1475+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[c, new_a, b], file_type=csv, has_header=false
14861476
"
14871477
);
14881478

@@ -1535,8 +1525,7 @@ fn test_sort_preserving_after_projection() -> Result<()> {
15351525
actual,
15361526
@r"
15371527
SortPreservingMergeExec: [b@2 ASC, c@0 + new_a@1 ASC]
1538-
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
1539-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1528+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[c, new_a, b], file_type=csv, has_header=false
15401529
"
15411530
);
15421531

@@ -1580,12 +1569,9 @@ fn test_union_after_projection() -> Result<()> {
15801569
actual,
15811570
@r"
15821571
UnionExec
1583-
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
1584-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1585-
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
1586-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1587-
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
1588-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1572+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[c, new_a, b], file_type=csv, has_header=false
1573+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[c, new_a, b], file_type=csv, has_header=false
1574+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[c, new_a, b], file_type=csv, has_header=false
15891575
"
15901576
);
15911577

@@ -1653,10 +1639,7 @@ fn test_partition_col_projection_pushdown() -> Result<()> {
16531639
let actual = after_optimize_string.trim();
16541640
assert_snapshot!(
16551641
actual,
1656-
@r"
1657-
ProjectionExec: expr=[string_col@1 as string_col, partition_col@2 as partition_col, int_col@0 as int_col]
1658-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false
1659-
"
1642+
@"DataSourceExec: file_groups={1 group: [[x]]}, projection=[string_col, partition_col, int_col], file_type=csv, has_header=false"
16601643
);
16611644

16621645
Ok(())
@@ -1699,10 +1682,7 @@ fn test_partition_col_projection_pushdown_expr() -> Result<()> {
16991682
let actual = after_optimize_string.trim();
17001683
assert_snapshot!(
17011684
actual,
1702-
@r"
1703-
ProjectionExec: expr=[string_col@1 as string_col, CAST(partition_col@2 AS Utf8View) as partition_col, int_col@0 as int_col]
1704-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false
1705-
"
1685+
@"DataSourceExec: file_groups={1 group: [[x]]}, projection=[string_col, partition_col, int_col], file_type=csv, has_header=false"
17061686
);
17071687

17081688
Ok(())

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -812,17 +812,16 @@ async fn test_physical_plan_display_indent_multi_children() {
812812

813813
assert_snapshot!(
814814
actual,
815-
@r###"
815+
@r"
816816
CoalesceBatchesExec: target_batch_size=4096
817817
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]
818818
CoalesceBatchesExec: target_batch_size=4096
819819
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1
820820
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
821821
CoalesceBatchesExec: target_batch_size=4096
822822
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
823-
ProjectionExec: expr=[c1@0 as c2]
824-
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
825-
"###
823+
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c2], file_type=csv, has_header=true
824+
"
826825
);
827826
}
828827

datafusion/datasource-arrow/src/source.rs

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,11 @@ impl FileSource for ArrowFileSource {
8484
object_store: Arc<dyn ObjectStore>,
8585
base_config: &FileScanConfig,
8686
_partition: usize,
87-
) -> Arc<dyn FileOpener> {
88-
Arc::new(ArrowFileOpener {
87+
) -> Result<Arc<dyn FileOpener>> {
88+
Ok(Arc::new(ArrowFileOpener {
8989
object_store,
9090
projection: base_config.file_column_projection_indices(),
91-
})
91+
}))
9292
}
9393

9494
fn as_any(&self) -> &dyn Any {
@@ -109,10 +109,6 @@ impl FileSource for ArrowFileSource {
109109
Arc::new(conf)
110110
}
111111

112-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
113-
Arc::new(Self { ..self.clone() })
114-
}
115-
116112
fn metrics(&self) -> &ExecutionPlanMetricsSet {
117113
&self.metrics
118114
}
@@ -176,11 +172,11 @@ impl FileSource for ArrowStreamFileSource {
176172
object_store: Arc<dyn ObjectStore>,
177173
base_config: &FileScanConfig,
178174
_partition: usize,
179-
) -> Arc<dyn FileOpener> {
180-
Arc::new(ArrowStreamFileOpener {
175+
) -> Result<Arc<dyn FileOpener>> {
176+
Ok(Arc::new(ArrowStreamFileOpener {
181177
object_store,
182178
projection: base_config.file_column_projection_indices(),
183-
})
179+
}))
184180
}
185181

186182
fn as_any(&self) -> &dyn Any {
@@ -197,10 +193,6 @@ impl FileSource for ArrowStreamFileSource {
197193
Arc::new(conf)
198194
}
199195

200-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
201-
Arc::new(Self { ..self.clone() })
202-
}
203-
204196
fn repartitioned(
205197
&self,
206198
_target_partitions: usize,
@@ -469,7 +461,7 @@ impl FileSource for ArrowSource {
469461
object_store: Arc<dyn ObjectStore>,
470462
base_config: &FileScanConfig,
471463
partition: usize,
472-
) -> Arc<dyn FileOpener> {
464+
) -> Result<Arc<dyn FileOpener>> {
473465
self.inner
474466
.create_file_opener(object_store, base_config, partition)
475467
}
@@ -484,12 +476,6 @@ impl FileSource for ArrowSource {
484476
})
485477
}
486478

487-
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
488-
Arc::new(Self {
489-
inner: self.inner.with_projection(config),
490-
})
491-
}
492-
493479
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
494480
Arc::new(Self {
495481
inner: self.inner.with_statistics(statistics),
@@ -639,7 +625,7 @@ mod tests {
639625
)
640626
.build();
641627

642-
let file_opener = source.create_file_opener(object_store, &scan_config, 0);
628+
let file_opener = source.create_file_opener(object_store, &scan_config, 0)?;
643629
let mut stream = file_opener.open(partitioned_file)?.await?;
644630

645631
assert!(stream.next().await.is_some());
@@ -681,7 +667,7 @@ mod tests {
681667
)
682668
.build();
683669

684-
let file_opener = source.create_file_opener(object_store, &scan_config, 0);
670+
let file_opener = source.create_file_opener(object_store, &scan_config, 0)?;
685671
let mut stream = file_opener.open(partitioned_file)?.await?;
686672

687673
assert!(stream.next().await.is_some());
@@ -722,7 +708,7 @@ mod tests {
722708
)
723709
.build();
724710

725-
let file_opener = source.create_file_opener(object_store, &scan_config, 0);
711+
let file_opener = source.create_file_opener(object_store, &scan_config, 0)?;
726712
let result = file_opener.open(partitioned_file);
727713
assert!(result.is_err());
728714

datafusion/datasource-avro/src/file_format.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion_common::{Result, Statistics};
3535
use datafusion_datasource::file::FileSource;
3636
use datafusion_datasource::file_compression_type::FileCompressionType;
3737
use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
38-
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
38+
use datafusion_datasource::file_scan_config::FileScanConfig;
3939
use datafusion_datasource::source::DataSourceExec;
4040
use datafusion_physical_plan::ExecutionPlan;
4141
use datafusion_session::Session;
@@ -154,11 +154,7 @@ impl FileFormat for AvroFormat {
154154
_state: &dyn Session,
155155
conf: FileScanConfig,
156156
) -> Result<Arc<dyn ExecutionPlan>> {
157-
let file_schema = Arc::clone(conf.file_schema());
158-
let config = FileScanConfigBuilder::from(conf)
159-
.with_source(Arc::new(AvroSource::new(file_schema)))
160-
.build();
161-
Ok(DataSourceExec::from_data_source(config))
157+
Ok(DataSourceExec::from_data_source(conf))
162158
}
163159

164160
fn file_source(

0 commit comments

Comments
 (0)