Skip to content

Commit a2e6c90

Browse files
authored
Improve UX Rename FileScanConfig::new_exec to FileScanConfig::build (#14670)
* Rename `FileScanConfig::new_exec` to `FileScanConfig::build` * Update docs * fix
1 parent 9681c3b commit a2e6c90

File tree

28 files changed

+100
-96
lines changed

28 files changed

+100
-96
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ impl TableProvider for IndexTableProvider {
504504
.with_file(partitioned_file);
505505

506506
// Finally, put it all together into a DataSourceExec
507-
Ok(file_scan_config.new_exec())
507+
Ok(file_scan_config.build())
508508
}
509509

510510
/// Tell DataFusion to push filters down to the scan method

datafusion-examples/examples/parquet_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ impl TableProvider for IndexTableProvider {
258258
file_size,
259259
));
260260
}
261-
Ok(file_scan_config.new_exec())
261+
Ok(file_scan_config.build())
262262
}
263263

264264
/// Tell DataFusion to push filters down to the scan method

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,10 @@ impl FileFormat for ArrowFormat {
171171
async fn create_physical_plan(
172172
&self,
173173
_state: &dyn Session,
174-
mut conf: FileScanConfig,
174+
conf: FileScanConfig,
175175
_filters: Option<&Arc<dyn PhysicalExpr>>,
176176
) -> Result<Arc<dyn ExecutionPlan>> {
177-
conf = conf.with_source(Arc::new(ArrowSource::default()));
178-
Ok(conf.new_exec())
177+
Ok(conf.with_source(Arc::new(ArrowSource::default())).build())
179178
}
180179

181180
async fn create_writer_physical_plan(

datafusion/core/src/datasource/file_format/avro.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,10 @@ impl FileFormat for AvroFormat {
148148
async fn create_physical_plan(
149149
&self,
150150
_state: &dyn Session,
151-
mut conf: FileScanConfig,
151+
conf: FileScanConfig,
152152
_filters: Option<&Arc<dyn PhysicalExpr>>,
153153
) -> Result<Arc<dyn ExecutionPlan>> {
154-
conf = conf.with_source(self.file_source());
155-
Ok(conf.new_exec())
154+
Ok(conf.with_source(self.file_source()).build())
156155
}
157156

158157
fn file_source(&self) -> Arc<dyn FileSource> {

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,7 @@ impl FileFormat for CsvFormat {
434434
.with_terminator(self.options.terminator)
435435
.with_comment(self.options.comment),
436436
);
437-
conf = conf.with_source(source);
438-
439-
Ok(conf.new_exec())
437+
Ok(conf.with_source(source).build())
440438
}
441439

442440
async fn create_writer_physical_plan(

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,7 @@ impl FileFormat for JsonFormat {
254254
) -> Result<Arc<dyn ExecutionPlan>> {
255255
let source = Arc::new(JsonSource::new());
256256
conf.file_compression_type = FileCompressionType::from(self.options.compression);
257-
conf = conf.with_source(source);
258-
259-
Ok(conf.new_exec())
257+
Ok(conf.with_source(source).build())
260258
}
261259

262260
async fn create_writer_physical_plan(

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ impl FileFormat for ParquetFormat {
398398
async fn create_physical_plan(
399399
&self,
400400
_state: &dyn Session,
401-
mut conf: FileScanConfig,
401+
conf: FileScanConfig,
402402
filters: Option<&Arc<dyn PhysicalExpr>>,
403403
) -> Result<Arc<dyn ExecutionPlan>> {
404404
let mut predicate = None;
@@ -424,8 +424,7 @@ impl FileFormat for ParquetFormat {
424424
if let Some(metadata_size_hint) = metadata_size_hint {
425425
source = source.with_metadata_size_hint(metadata_size_hint)
426426
}
427-
conf = conf.with_source(Arc::new(source));
428-
Ok(conf.new_exec())
427+
Ok(conf.with_source(Arc::new(source)).build())
429428
}
430429

431430
async fn create_writer_physical_plan(

datafusion/core/src/datasource/physical_plan/avro.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ mod tests {
399399
.with_file(meta.into())
400400
.with_projection(Some(vec![0, 1, 2]));
401401

402-
let source_exec = conf.new_exec();
402+
let source_exec = conf.build();
403403
assert_eq!(
404404
source_exec
405405
.properties()
@@ -472,7 +472,7 @@ mod tests {
472472
.with_file(meta.into())
473473
.with_projection(projection);
474474

475-
let source_exec = conf.new_exec();
475+
let source_exec = conf.build();
476476
assert_eq!(
477477
source_exec
478478
.properties()
@@ -546,7 +546,7 @@ mod tests {
546546
.with_file(partitioned_file)
547547
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]);
548548

549-
let source_exec = conf.new_exec();
549+
let source_exec = conf.build();
550550

551551
assert_eq!(
552552
source_exec

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ impl ExecutionPlan for CsvExec {
425425
/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema, source)
426426
/// .with_file(PartitionedFile::new("file1.csv", 100*1024*1024))
427427
/// .with_newlines_in_values(true); // The file contains newlines in values;
428-
/// let exec = file_scan_config.new_exec();
428+
/// let exec = file_scan_config.build();
429429
/// ```
430430
#[derive(Debug, Clone, Default)]
431431
pub struct CsvSource {
@@ -836,14 +836,14 @@ mod tests {
836836
)?;
837837

838838
let source = Arc::new(CsvSource::new(true, b',', b'"'));
839-
let mut config = partitioned_csv_config(file_schema, file_groups, source)
839+
let config = partitioned_csv_config(file_schema, file_groups, source)
840840
.with_file_compression_type(file_compression_type)
841-
.with_newlines_in_values(false);
842-
config.projection = Some(vec![0, 2, 4]);
843-
844-
let csv = config.new_exec();
841+
.with_newlines_in_values(false)
842+
.with_projection(Some(vec![0, 2, 4]));
845843

846844
assert_eq!(13, config.file_schema.fields().len());
845+
let csv = config.build();
846+
847847
assert_eq!(3, csv.schema().fields().len());
848848

849849
let mut stream = csv.execute(0, task_ctx)?;
@@ -901,12 +901,12 @@ mod tests {
901901
)?;
902902

903903
let source = Arc::new(CsvSource::new(true, b',', b'"'));
904-
let mut config = partitioned_csv_config(file_schema, file_groups, source)
904+
let config = partitioned_csv_config(file_schema, file_groups, source)
905905
.with_newlines_in_values(false)
906-
.with_file_compression_type(file_compression_type.to_owned());
907-
config.projection = Some(vec![4, 0, 2]);
908-
let csv = config.new_exec();
906+
.with_file_compression_type(file_compression_type.to_owned())
907+
.with_projection(Some(vec![4, 0, 2]));
909908
assert_eq!(13, config.file_schema.fields().len());
909+
let csv = config.build();
910910
assert_eq!(3, csv.schema().fields().len());
911911

912912
let mut stream = csv.execute(0, task_ctx)?;
@@ -964,12 +964,12 @@ mod tests {
964964
)?;
965965

966966
let source = Arc::new(CsvSource::new(true, b',', b'"'));
967-
let mut config = partitioned_csv_config(file_schema, file_groups, source)
967+
let config = partitioned_csv_config(file_schema, file_groups, source)
968968
.with_newlines_in_values(false)
969-
.with_file_compression_type(file_compression_type.to_owned());
970-
config.limit = Some(5);
971-
let csv = config.new_exec();
969+
.with_file_compression_type(file_compression_type.to_owned())
970+
.with_limit(Some(5));
972971
assert_eq!(13, config.file_schema.fields().len());
972+
let csv = config.build();
973973
assert_eq!(13, csv.schema().fields().len());
974974

975975
let mut it = csv.execute(0, task_ctx)?;
@@ -1024,12 +1024,12 @@ mod tests {
10241024
)?;
10251025

10261026
let source = Arc::new(CsvSource::new(true, b',', b'"'));
1027-
let mut config = partitioned_csv_config(file_schema, file_groups, source)
1027+
let config = partitioned_csv_config(file_schema, file_groups, source)
10281028
.with_newlines_in_values(false)
1029-
.with_file_compression_type(file_compression_type.to_owned());
1030-
config.limit = Some(5);
1031-
let csv = config.new_exec();
1029+
.with_file_compression_type(file_compression_type.to_owned())
1030+
.with_limit(Some(5));
10321031
assert_eq!(14, config.file_schema.fields().len());
1032+
let csv = config.build();
10331033
assert_eq!(14, csv.schema().fields().len());
10341034

10351035
// errors due to https://github.com/apache/datafusion/issues/4918
@@ -1089,8 +1089,8 @@ mod tests {
10891089
// we don't have `/date=xx/` in the path but that is ok because
10901090
// partitions are resolved during scan anyway
10911091

1092-
let csv = config.new_exec();
10931092
assert_eq!(13, config.file_schema.fields().len());
1093+
let csv = config.build();
10941094
assert_eq!(2, csv.schema().fields().len());
10951095

10961096
let mut it = csv.execute(0, task_ctx)?;
@@ -1179,7 +1179,7 @@ mod tests {
11791179
let config = partitioned_csv_config(file_schema, file_groups, source)
11801180
.with_newlines_in_values(false)
11811181
.with_file_compression_type(file_compression_type.to_owned());
1182-
let csv = config.new_exec();
1182+
let csv = config.build();
11831183

11841184
let it = csv.execute(0, task_ctx).unwrap();
11851185
let batches: Vec<_> = it.try_collect().await.unwrap();

datafusion/core/src/datasource/physical_plan/file_scan_config.rs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,21 +68,30 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
6868
ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
6969
}
7070

71-
/// The base configurations to provide when creating a physical plan for
71+
/// The base configurations for a [`DataSourceExec`], the a physical plan for
7272
/// any given file format.
7373
///
74+
/// Use [`Self::build`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
75+
///
7476
/// # Example
7577
/// ```
7678
/// # use std::sync::Arc;
77-
/// # use arrow::datatypes::Schema;
79+
/// # use arrow::datatypes::{Field, Fields, DataType, Schema};
7880
/// # use datafusion::datasource::listing::PartitionedFile;
7981
/// # use datafusion::datasource::physical_plan::FileScanConfig;
8082
/// # use datafusion_execution::object_store::ObjectStoreUrl;
8183
/// # use datafusion::datasource::physical_plan::ArrowSource;
82-
/// # let file_schema = Arc::new(Schema::empty());
83-
/// // create FileScan config for reading data from file://
84+
/// # use datafusion_physical_plan::ExecutionPlan;
85+
/// # let file_schema = Arc::new(Schema::new(vec![
86+
/// # Field::new("c1", DataType::Int32, false),
87+
/// # Field::new("c2", DataType::Int32, false),
88+
/// # Field::new("c3", DataType::Int32, false),
89+
/// # Field::new("c4", DataType::Int32, false),
90+
/// # ]));
91+
/// // create FileScan config for reading arrow files from file://
8492
/// let object_store_url = ObjectStoreUrl::local_filesystem();
85-
/// let config = FileScanConfig::new(object_store_url, file_schema, Arc::new(ArrowSource::default()))
93+
/// let file_source = Arc::new(ArrowSource::default());
94+
/// let config = FileScanConfig::new(object_store_url, file_schema, file_source)
8695
/// .with_limit(Some(1000)) // read only the first 1000 records
8796
/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3
8897
/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
@@ -93,6 +102,8 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
93102
/// PartitionedFile::new("file2.parquet", 56),
94103
/// PartitionedFile::new("file3.parquet", 78),
95104
/// ]);
105+
/// // create an execution plan from the config
106+
/// let plan: Arc<dyn ExecutionPlan> = config.build();
96107
/// ```
97108
#[derive(Clone)]
98109
pub struct FileScanConfig {
@@ -252,19 +263,20 @@ impl DataSource for FileScanConfig {
252263
// If there is any non-column or alias-carrier expression, Projection should not be removed.
253264
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.
254265
Ok(all_alias_free_columns(projection.expr()).then(|| {
255-
let mut file_scan = self.clone();
266+
let file_scan = self.clone();
256267
let source = Arc::clone(&file_scan.source);
257268
let new_projections = new_projections_for_columns(
258269
projection,
259270
&file_scan
260271
.projection
272+
.clone()
261273
.unwrap_or((0..self.file_schema.fields().len()).collect()),
262274
);
263-
file_scan.projection = Some(new_projections);
264-
// Assign projected statistics to source
265-
file_scan = file_scan.with_source(source);
266-
267-
file_scan.new_exec() as _
275+
file_scan
276+
// Assign projected statistics to source
277+
.with_projection(Some(new_projections))
278+
.with_source(source)
279+
.build() as _
268280
}))
269281
}
270282
}
@@ -574,9 +586,9 @@ impl FileScanConfig {
574586
}
575587

576588
// TODO: This function should be moved into DataSourceExec once FileScanConfig moved out of datafusion/core
577-
/// Returns a new [`DataSourceExec`] from file configurations
578-
pub fn new_exec(&self) -> Arc<DataSourceExec> {
579-
Arc::new(DataSourceExec::new(Arc::new(self.clone())))
589+
/// Returns a new [`DataSourceExec`] to scan the files specified by this config
590+
pub fn build(self) -> Arc<DataSourceExec> {
591+
Arc::new(DataSourceExec::new(Arc::new(self)))
580592
}
581593

582594
/// Write the data_type based on file_source

0 commit comments

Comments
 (0)