-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Add SchemaAdapterFactory Support for ListingTable with Schema Evolution and Mapping #16583
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…sertions in `ListingTableConfig` methods
…ion support details
…ma_adapter method
…chema state representation
…s_empty() for clarity
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving some questions / suggestions
| /// # // Custom schema adapter for handling schema evolution | ||
| /// # #[derive(Debug)] | ||
| /// # struct EvolutionSchemaAdapterFactory; | ||
| /// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory { | ||
| /// # fn create(&self, projected_table_schema: SchemaRef, file_schema: SchemaRef) -> Box<dyn SchemaAdapter> { | ||
| /// # unimplemented!("Custom schema adapter implementation") | ||
| /// # } | ||
| /// # } | ||
| /// # |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be here. It belongs alongside the SchemaAdapter trait / might already be there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this PR, schema adapters was not used anywhere in the repo, except in tests.
Using schema adapter with a ListingTable is the first illustration of how to use schema adapters.
Therefore, added the example here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted this example, because there is an almost similar example appropriately located near fn with_schema_adapter_factory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that (1) most users won't be using a SchemaAdapter and (2) linking to the existing docs on SchemaAdapter and maybe enhancing those should be enough.
| // Note: We preserve existing options state, but downstream code may expect | ||
| // options to be set. Consider calling with_listing_options() or infer_options() | ||
| // before operations that require options to be present. | ||
| debug_assert!( | ||
| self.options.is_some() || cfg!(test), | ||
| "ListingTableConfig::with_schema called without options set. \ | ||
| Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code." | ||
| ); | ||
|
|
||
| Self { | ||
| table_paths: self.table_paths, | ||
| file_schema: Some(schema), | ||
| options: self.options, | ||
| schema_source: SchemaSource::Specified, | ||
| ..self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related to this change or drive by?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a minor refactor because I added derive Default for ListingTableConfig
| /// | ||
| /// # Example: Configuring Parquet Files with Custom Options | ||
| /// ```rust | ||
| /// # use std::sync::Arc; | ||
| /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; | ||
| /// # use datafusion::datasource::file_format::parquet::ParquetFormat; | ||
| /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); | ||
| /// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) | ||
| /// .with_file_extension(".parquet") | ||
| /// .with_collect_stat(true); | ||
| /// | ||
| /// let config = ListingTableConfig::new(table_paths) | ||
| /// .with_listing_options(options); // Configure file format and options | ||
| /// ``` | ||
| pub fn with_listing_options(self, listing_options: ListingOptions) -> Self { | ||
| // Note: This method properly sets options, but be aware that downstream | ||
| // methods like infer_schema() and try_new() require both schema and options | ||
| // to be set to function correctly. | ||
| debug_assert!( | ||
| !self.table_paths.is_empty() || cfg!(test), | ||
| "ListingTableConfig::with_listing_options called without table_paths set. \ | ||
| Consider calling new() or new_with_multi_paths() first to establish table paths." | ||
| ); | ||
|
|
||
| Self { | ||
| table_paths: self.table_paths, | ||
| file_schema: self.file_schema, | ||
| options: Some(listing_options), | ||
| schema_source: self.schema_source, | ||
| ..self | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a minor refactor because I derive Default for ListingTableConfig
| /// # #[derive(Debug)] | ||
| /// # struct MySchemaAdapterFactory; | ||
| /// # impl SchemaAdapterFactory for MySchemaAdapterFactory { | ||
| /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box<dyn SchemaAdapter> { | ||
| /// # unimplemented!() | ||
| /// # } | ||
| /// # } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the example can just show setting the DefaultSchemaAdapterFactory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I retained this as an example of custom schema adapter factory....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that showing an example using DefaultSchemaAdapterFactory here and linking to proper extensive docs on how to build a custom SchemaAdapter and the multiple layers of factories is more helpful.
| /// # #[derive(Debug)] | ||
| /// # struct EvolutionAdapterFactory; | ||
| /// # impl SchemaAdapterFactory for EvolutionAdapterFactory { | ||
| /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box<dyn SchemaAdapter> { | ||
| /// # unimplemented!() | ||
| /// # } | ||
| /// # } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, we can use DefaultSchemaAdapterFactory in the examples
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amended
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... and changed this to DefaultSchemaAdapterFactory
with a note that they can refer to ListingTableConfig::with_schema_adapter_factory for custom example
| let table_schema = self.schema(); | ||
| match &self.schema_adapter_factory { | ||
| Some(factory) => { | ||
| factory.create(Arc::clone(&table_schema), Arc::clone(&table_schema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not this PR's fault but this is such a sad unfortunate API... why in the world do we pass in the same thing twice!?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added create_with_projected_schema to the api
| let schema_adapter = self.create_schema_adapter(); | ||
| let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of what smells wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is just for the column statistics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
This reverts commit a300a30.
… ListingTableConfig
…method for convenience
…nto listing-config-14757
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a good improvement to me!
Btw, do you use ListingTable in production? There's plans to move it to an external repo or modularize it, it might be interesting for you to share your use case of it.
Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
…on and Mapping (apache#16583) - Introduced a new `schema_adapter_factory` field in `ListingTableConfig` and `ListingTable` - Added `with_schema_adapter_factory` and `schema_adapter_factory()` methods to both structs - Modified execution planning logic to apply schema adapters during scanning - Updated statistics collection to use mapped schemas - Implemented detailed documentation and example usage in doc comments - Added new unit and integration tests validating schema adapter behavior and error cases
Which issue does this PR close?
This is the last of a series of PRs re-implementing #15295 to close #14757 by adding schema‐evolution support for:
in DataFusion.
Rationale for this change
This change introduces schema adapter support to improve DataFusion's ability to handle schema evolution scenarios in listing-based tables. It provides a more flexible and robust mechanism for adapting schemas between files and logical table definitions, including support for custom adapters that handle field renaming, type coercion, and column reordering.
What changes are included in this PR?
schema_adapter_factoryfield inListingTableConfigandListingTablewith_schema_adapter_factoryandschema_adapter_factory()methods to both structsAre these changes tested?
Yes, the PR includes a suite of tests to verify:
Are there any user-facing changes?
Yes:
SchemaAdapterFactoryinListingTableConfigto customize how file schemas are interpreted relative to table schemas