Skip to content

Conversation

@kosiew
Copy link
Contributor

@kosiew kosiew commented Jun 27, 2025

Which issue does this PR close?

This is the last of a series of PRs re-implementing #15295 to close #14757 by adding schema‐evolution support for:

  • listing‐based tables
  • with nested structs
    in DataFusion.

Rationale for this change

This change introduces schema adapter support to improve DataFusion's ability to handle schema evolution scenarios in listing-based tables. It provides a more flexible and robust mechanism for adapting schemas between files and logical table definitions, including support for custom adapters that handle field renaming, type coercion, and column reordering.

What changes are included in this PR?

  • Introduced a new schema_adapter_factory field in ListingTableConfig and ListingTable
  • Added with_schema_adapter_factory and schema_adapter_factory() methods to both structs
  • Modified execution planning logic to apply schema adapters during scanning
  • Updated statistics collection to use mapped schemas
  • Implemented detailed documentation and example usage in doc comments
  • Added new unit and integration tests validating schema adapter behavior and error cases

Are these changes tested?

Yes, the PR includes a suite of tests to verify:

  • Default behavior without a custom schema adapter
  • Custom adapter impact on statistics mapping
  • Error propagation from schema mapping failures
  • Integration with listing scan execution

Are there any user-facing changes?

Yes:

  • Users can now provide a SchemaAdapterFactory in ListingTableConfig to customize how file schemas are interpreted relative to table schemas
  • Documentation is included to guide usage of schema evolution and custom mapping
  • Backwards-compatible; default behavior remains unchanged unless a factory is explicitly set

@github-actions github-actions bot added the core Core DataFusion crate label Jun 27, 2025
@kosiew kosiew changed the title Add schema adapter factory and related code into ListingTableConfig Add SchemaAdapterFactory Support for ListingTable with Schema Evolution and Mapping Jun 27, 2025
@kosiew kosiew marked this pull request as ready for review June 27, 2025 13:04
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving some questions / suggestions

Comment on lines 97 to 105
/// # // Custom schema adapter for handling schema evolution
/// # #[derive(Debug)]
/// # struct EvolutionSchemaAdapterFactory;
/// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory {
/// # fn create(&self, projected_table_schema: SchemaRef, file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
/// # unimplemented!("Custom schema adapter implementation")
/// # }
/// # }
/// #
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be here. It belongs alongside the SchemaAdapter trait / might already be there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR, schema adapters was not used anywhere in the repo, except in tests.
Using schema adapter with a ListingTable is the first illustration of how to use schema adapters.
Therefore, added the example here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted this example, because there is an almost similar example appropriately located near fn with_schema_adapter_factory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that (1) most users won't be using a SchemaAdapter and (2) linking to the existing docs on SchemaAdapter and maybe enhancing those should be enough.

Comment on lines +198 to +210
// Note: We preserve existing options state, but downstream code may expect
// options to be set. Consider calling with_listing_options() or infer_options()
// before operations that require options to be present.
debug_assert!(
self.options.is_some() || cfg!(test),
"ListingTableConfig::with_schema called without options set. \
Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code."
);

Self {
table_paths: self.table_paths,
file_schema: Some(schema),
options: self.options,
schema_source: SchemaSource::Specified,
..self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to this change or drive by?

Copy link
Contributor Author

@kosiew kosiew Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor refactor because I added derive Default for ListingTableConfig

Comment on lines +218 to 246
///
/// # Example: Configuring Parquet Files with Custom Options
/// ```rust
/// # use std::sync::Arc;
/// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl};
/// # use datafusion::datasource::file_format::parquet::ParquetFormat;
/// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
/// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
/// .with_file_extension(".parquet")
/// .with_collect_stat(true);
///
/// let config = ListingTableConfig::new(table_paths)
/// .with_listing_options(options); // Configure file format and options
/// ```
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
// Note: This method properly sets options, but be aware that downstream
// methods like infer_schema() and try_new() require both schema and options
// to be set to function correctly.
debug_assert!(
!self.table_paths.is_empty() || cfg!(test),
"ListingTableConfig::with_listing_options called without table_paths set. \
Consider calling new() or new_with_multi_paths() first to establish table paths."
);

Self {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(listing_options),
schema_source: self.schema_source,
..self
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor refactor because I derive Default for ListingTableConfig

Comment on lines +431 to +437
/// # #[derive(Debug)]
/// # struct MySchemaAdapterFactory;
/// # impl SchemaAdapterFactory for MySchemaAdapterFactory {
/// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
/// # unimplemented!()
/// # }
/// # }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the example can just show setting the DefaultSchemaAdapterFactory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I retained this as an example of custom schema adapter factory....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that showing an example using DefaultSchemaAdapterFactory here and linking to proper extensive docs on how to build a custom SchemaAdapter and the multiple layers of factories is more helpful.

Comment on lines 1063 to 1069
/// # #[derive(Debug)]
/// # struct EvolutionAdapterFactory;
/// # impl SchemaAdapterFactory for EvolutionAdapterFactory {
/// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
/// # unimplemented!()
/// # }
/// # }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, we can use DefaultSchemaAdapterFactory in the examples

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amended

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and changed this to DefaultSchemaAdapterFactory
with a note that they can refer to ListingTableConfig::with_schema_adapter_factory for custom example

let table_schema = self.schema();
match &self.schema_adapter_factory {
Some(factory) => {
factory.create(Arc::clone(&table_schema), Arc::clone(&table_schema))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not this PR's fault but this is such a sad unfortunate API... why in the world do we pass in the same thing twice!?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added create_with_projected_schema to the api

Comment on lines +1403 to +1404
let schema_adapter = self.create_schema_adapter();
let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of what smells wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is just for the column statistics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

@github-actions github-actions bot added the datasource Changes to the datasource crate label Jul 3, 2025
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good improvement to me!

Btw, do you use ListingTable in production? There's plans to move it to an external repo or modularize it, it might be interesting for you to share your use case of it.

Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
@kosiew
Copy link
Contributor Author

kosiew commented Jul 4, 2025

@adriangb

Thanks for reviewing this.

Btw, do you use ListingTable in production? There's plans to move it to an external repo or modularize it, it might be interesting for you to share your use case of it.

No, I looked into it because of #14757

@kosiew kosiew merged commit 8b9c1f1 into apache:main Jul 4, 2025
27 checks passed
Standing-Man pushed a commit to Standing-Man/datafusion that referenced this pull request Jul 4, 2025
…on and Mapping (apache#16583)

- Introduced a new `schema_adapter_factory` field in `ListingTableConfig` and `ListingTable`
- Added `with_schema_adapter_factory` and `schema_adapter_factory()` methods to both structs
- Modified execution planning logic to apply schema adapters during scanning
- Updated statistics collection to use mapped schemas
- Implemented detailed documentation and example usage in doc comments
- Added new unit and integration tests validating schema adapter behavior and error cases
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Datafusion can't seem to cast evolving structs

2 participants