Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 14 additions & 14 deletions datafusion-examples/examples/custom_data_source/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,22 @@ async fn csv_opener() -> Result<()> {
..Default::default()
};

let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
Arc::new(CsvSource::new(Arc::clone(&schema)).with_csv_options(options.clone())),
)
.with_projection_indices(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10))
.build();

let config = CsvSource::new(Arc::clone(&schema))
let source = CsvSource::new(Arc::clone(&schema))
.with_csv_options(options)
.with_comment(Some(b'#'))
.with_batch_size(8192)
.with_projection(&scan_config);
.with_batch_size(8192);

let scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
Comment on lines +72 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

.with_projection_indices(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10))
.build()?;

let opener = config.create_file_opener(object_store, &scan_config, 0);
let opener =
scan_config
.file_source()
.create_file_opener(object_store, &scan_config, 0)?;

let mut result = vec![];
let mut stream =
Expand Down Expand Up @@ -136,7 +136,7 @@ async fn json_opener() -> Result<()> {
.with_projection_indices(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10))
.build();
.build()?;

let mut stream = FileStream::new(
&scan_config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ impl TableProvider for IndexTableProvider {
.with_limit(limit)
.with_projection_indices(projection.cloned())
.with_file(partitioned_file)
.build();
.build()?;

// Finally, put it all together into a DataSourceExec
Ok(DataSourceExec::from_data_source(file_scan_config))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ impl TableProvider for DistinctIndexTable {
PartitionedFile::new(path.to_str().unwrap().to_string(), len);
builder = builder.with_file(partitioned_file);
}
Ok(DataSourceExec::from_data_source(builder.build()))
Ok(DataSourceExec::from_data_source(builder.build()?))
}

/// Tell DataFusion that we can handle filters on the "category" column
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/data_io/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl TableProvider for IndexTableProvider {
);
}
Ok(DataSourceExec::from_data_source(
file_scan_config_builder.build(),
file_scan_config_builder.build()?,
))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/default_column_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl TableProvider for DefaultValueTableProvider {
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));

Ok(Arc::new(DataSourceExec::new(Arc::new(
file_scan_config.build(),
file_scan_config.build()?,
))))
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl TableProvider for ListingTable {
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_expr_adapter(self.expr_adapter_factory.clone())
.build(),
.build()?,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ mod tests {
let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
assert!(formatted.contains("FilterExec: id@0 = 1"));
assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}");

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub(crate) mod test_util {
.with_statistics(statistics)
.with_projection_indices(projection)
.with_limit(limit)
.build(),
.build()?,
)
.await?;
Ok(exec)
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ mod tests {
let base_conf =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_file(partitioned_file)
.build();
.build()
.unwrap();

let parquet_exec = DataSourceExec::from_data_source(base_conf);

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ mod tests {
let conf = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_file(meta.into())
.with_projection_indices(Some(vec![0, 1, 2]))
.build();
.build()?;

let source_exec = DataSourceExec::from_data_source(conf);
assert_eq!(
Expand Down Expand Up @@ -157,7 +157,7 @@ mod tests {
let conf = FileScanConfigBuilder::new(object_store_url, source)
.with_file(meta.into())
.with_projection_indices(projection)
.build();
.build()?;

let source_exec = DataSourceExec::from_data_source(conf);
assert_eq!(
Expand Down Expand Up @@ -233,7 +233,7 @@ mod tests {
// column which is supposed to be the last column in the table schema.
.with_projection_indices(projection)
.with_file(partitioned_file)
.build();
.build()?;

let source_exec = DataSourceExec::from_data_source(conf);

Expand Down
24 changes: 12 additions & 12 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ mod tests {
let source =
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
.with_file_compression_type(file_compression_type)
.with_newlines_in_values(false)
.with_projection_indices(Some(vec![0, 2, 4]))
.build();
.build()?;

assert_eq!(13, config.file_schema().fields().len());
let csv = DataSourceExec::from_data_source(config);
Expand Down Expand Up @@ -199,11 +199,11 @@ mod tests {
let source =
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.with_projection_indices(Some(vec![4, 0, 2]))
.build();
.build()?;
assert_eq!(13, config.file_schema().fields().len());
let csv = DataSourceExec::from_data_source(config);
assert_eq!(3, csv.schema().fields().len());
Expand Down Expand Up @@ -271,11 +271,11 @@ mod tests {
let source =
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.with_limit(Some(5))
.build();
.build()?;
assert_eq!(13, config.file_schema().fields().len());
let csv = DataSourceExec::from_data_source(config);
assert_eq!(13, csv.schema().fields().len());
Expand Down Expand Up @@ -342,11 +342,11 @@ mod tests {
let source =
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.with_limit(Some(5))
.build();
.build()?;
assert_eq!(14, config.file_schema().fields().len());
let csv = DataSourceExec::from_data_source(config);
assert_eq!(14, csv.schema().fields().len());
Expand Down Expand Up @@ -411,13 +411,13 @@ mod tests {
let source =
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
// We should be able to project on the partition column
// Which is supposed to be after the file fields
.with_projection_indices(Some(vec![0, num_file_schema_fields]))
.build();
.build()?;

// we don't have `/date=xx/` in the path but that is ok because
// partitions are resolved during scan anyway
Expand Down Expand Up @@ -517,10 +517,10 @@ mod tests {
let source =
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.build();
.build()?;
let csv = DataSourceExec::from_data_source(config);

let it = csv.execute(0, task_ctx).unwrap();
Expand Down
12 changes: 8 additions & 4 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ mod tests {
.with_file_groups(file_groups)
.with_limit(Some(3))
.with_file_compression_type(file_compression_type.to_owned())
.build();
.build()
.unwrap();
let exec = DataSourceExec::from_data_source(conf);

// TODO: this is not where schema inference should be tested
Expand Down Expand Up @@ -260,7 +261,8 @@ mod tests {
.with_file_groups(file_groups)
.with_limit(Some(3))
.with_file_compression_type(file_compression_type.to_owned())
.build();
.build()
.unwrap();
let exec = DataSourceExec::from_data_source(conf);

let mut it = exec.execute(0, task_ctx)?;
Expand Down Expand Up @@ -303,7 +305,8 @@ mod tests {
.with_file_groups(file_groups)
.with_projection_indices(Some(vec![0, 2]))
.with_file_compression_type(file_compression_type.to_owned())
.build();
.build()
.unwrap();
let exec = DataSourceExec::from_data_source(conf);
let inferred_schema = exec.schema();
assert_eq!(inferred_schema.fields().len(), 2);
Expand Down Expand Up @@ -351,7 +354,8 @@ mod tests {
.with_file_groups(file_groups)
.with_projection_indices(Some(vec![3, 0, 2]))
.with_file_compression_type(file_compression_type.to_owned())
.build();
.build()
.unwrap();
let exec = DataSourceExec::from_data_source(conf);
let inferred_schema = exec.schema();
assert_eq!(inferred_schema.fields().len(), 3);
Expand Down
13 changes: 8 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ mod tests {
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_file_group(file_group)
.with_projection_indices(self.projection.clone())
.build();
.build()
.unwrap();
DataSourceExec::from_data_source(base_config)
}

Expand Down Expand Up @@ -1545,7 +1546,7 @@ mod tests {
Arc::new(ParquetSource::new(file_schema)),
)
.with_file_groups(file_groups)
.build();
.build()?;

let parquet_exec = DataSourceExec::from_data_source(config);
assert_eq!(
Expand Down Expand Up @@ -1664,7 +1665,8 @@ mod tests {
.with_file(partitioned_file)
// file has 10 cols so index 12 should be month and 13 should be day
.with_projection_indices(Some(vec![0, 1, 2, 12, 13]))
.build();
.build()
.unwrap();

let parquet_exec = DataSourceExec::from_data_source(config);
let partition_count = parquet_exec
Expand Down Expand Up @@ -1728,7 +1730,7 @@ mod tests {
Arc::new(ParquetSource::new(file_schema)),
)
.with_file(partitioned_file)
.build();
.build()?;

let parquet_exec = DataSourceExec::from_data_source(config);

Expand Down Expand Up @@ -2308,7 +2310,8 @@ mod tests {
extensions: None,
metadata_size_hint: None,
})
.build();
.build()
.unwrap();

let exec = DataSourceExec::from_data_source(config);

Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/view_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,10 @@ mod tests {
.to_string();
assert!(formatted.contains("DataSourceExec: "));
assert!(formatted.contains("file_type=parquet"));
assert!(formatted.contains("projection=[bool_col, int_col], limit=10"));
assert!(
formatted.contains("projection=[bool_col, int_col], limit=10"),
"{formatted}"
);
Ok(())
}

Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,10 @@ pub fn scan_partitioned_csv(
};
let table_schema = TableSchema::from_file_schema(schema);
let source = Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config = FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build();
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build()?;
Ok(DataSourceExec::from_data_source(config))
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,13 @@ impl TestParquetFile {
.with_table_parquet_options(parquet_options)
.with_predicate(Arc::clone(&physical_filter_expr)),
);
let config = scan_config_builder.with_source(source).build();
let config = scan_config_builder.with_source(source).build()?;
let parquet_exec = DataSourceExec::from_data_source(config);

let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
Ok(exec)
} else {
let config = scan_config_builder.build();
let config = scan_config_builder.build()?;
Ok(DataSourceExec::from_data_source(config))
}
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/fuzz_cases/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ async fn execute_with_predicate(
})
.collect(),
)
.build();
.build()
.unwrap();
let exec = DataSourceExec::from_data_source(config);
let exec =
Arc::new(FilterExec::try_new(predicate, exec).unwrap()) as Arc<dyn ExecutionPlan>;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/parquet/custom_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
source,
)
.with_file_group(file_group)
.build();
.build()
.unwrap();

let parquet_exec = DataSourceExec::from_data_source(base_config);

Expand Down
Loading
Loading