-
Notifications
You must be signed in to change notification settings - Fork 1.8k
move projection handling into FileSource #18627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| let scan_config = | ||
| FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
|
This is the tracer bullet I've been using to guide this change: use arrow::util::pretty::pretty_format_batches;
use arrow_schema::DataType;
use datafusion::{
common::Result,
prelude::{ParquetReadOptions, SessionContext},
};
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx
.read_parquet(
"data/",
ParquetReadOptions::default()
.table_partition_cols(vec![("b".to_string(), DataType::UInt32)]),
)
.await?;
let res = df
.select_exprs(&["b", "a", "a = b", "a * 2"])?
.collect()
.await?;
println!("Result:\n{}", pretty_format_batches(&res)?);
Ok(())
}I plan to do a review and cleanup, handle the protobuf stuff (I know that needs to be changed) and then fix all the rest of the tests. It looks like we have 36 failing tests, not too bad, and a lot of them look related. |
|
Down to 4 failing tests! All proto related. |
| // No projection - use the full file schema | ||
| Arc::clone(self.table_schema.file_schema()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: do we need to dynamically generate a "full" projection that includes table partition columns? Or should that be the default that each FileSource initializes itself with (then we don't even need to check here!)?
d485ef7 to
493036f
Compare
|
I'm marking this as ready for review. I expect more changes / cleanup is needed, I already have some ideas, but I'd like some initial feedback. cc @AdamGS @XiangpengHao @waynexia since you've all expressed interest. |
bc3d8f4 to
5627591
Compare
This PR adds trait implementations, a project_batch() method, and fixes a bug in update_expr() for literal expressions. Also adds comprehensive tests. Part of apache#18627
This commit consolidates the separate ArrowFileSource and ArrowStreamFileSource implementations into a unified ArrowSource with an ArrowFormat enum. Key changes: - Removed ArrowFileSource and ArrowStreamFileSource structs - Added ArrowFormat enum (File, Stream) to distinguish between formats - Created unified ArrowSource struct that uses ArrowFormat to dispatch - Kept separate ArrowFileOpener and ArrowStreamFileOpener implementations - Consolidated all FileSource trait implementations in ArrowSource - Format-specific behavior in repartitioned() method (Stream returns None) This consolidation reduces code duplication while maintaining clear separation of concerns between the file and stream format handling. Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit moves statistics handling from individual FileSource implementations into FileScanConfig, simplifying the FileSource trait. Changes: - Remove statistics() and with_statistics() methods from FileSource trait - Remove with_projection() method from FileSource trait (statistics PR only) - Add statistics field to FileScanConfig struct - Add statistics() method to FileScanConfig to retrieve statistics - Update FileScanConfigBuilder to properly handle statistics - Remove projected_statistics field from all FileSource implementations: - ParquetSource - CsvSource - JsonSource - AvroSource - ArrowFileSource and ArrowStreamFileSource - MockSource (test utility) - Update test utilities and assertions to use config.statistics() instead of file_source.statistics() - Update proto serialization to use config.statistics() Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This PR adds trait implementations, a project_batch() method, and fixes a bug in update_expr() for literal expressions. Also adds comprehensive tests. Part of apache#18627
This commit consolidates the separate ArrowFileSource and ArrowStreamFileSource implementations into a unified ArrowSource with an ArrowFormat enum. Key changes: - Removed ArrowFileSource and ArrowStreamFileSource structs - Added ArrowFormat enum (File, Stream) to distinguish between formats - Created unified ArrowSource struct that uses ArrowFormat to dispatch - Kept separate ArrowFileOpener and ArrowStreamFileOpener implementations - Consolidated all FileSource trait implementations in ArrowSource - Format-specific behavior in repartitioned() method (Stream returns None) This consolidation reduces code duplication while maintaining clear separation of concerns between the file and stream format handling. Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit moves statistics handling from individual FileSource implementations into FileScanConfig, simplifying the FileSource trait. Changes: - Remove statistics() and with_statistics() methods from FileSource trait - Remove with_projection() method from FileSource trait (statistics PR only) - Add statistics field to FileScanConfig struct - Add statistics() method to FileScanConfig to retrieve statistics - Update FileScanConfigBuilder to properly handle statistics - Remove projected_statistics field from all FileSource implementations: - ParquetSource - CsvSource - JsonSource - AvroSource - ArrowFileSource and ArrowStreamFileSource - MockSource (test utility) - Update test utilities and assertions to use config.statistics() instead of file_source.statistics() - Update proto serialization to use config.statistics() Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This PR adds trait implementations, a project_batch() method, and fixes a bug in update_expr() for literal expressions. Also adds comprehensive tests. Part of apache#18627
a3e37bf to
3997572
Compare
This commit consolidates the separate ArrowFileSource and ArrowStreamFileSource implementations into a unified ArrowSource with an ArrowFormat enum. Key changes: - Removed ArrowFileSource and ArrowStreamFileSource structs - Added ArrowFormat enum (File, Stream) to distinguish between formats - Created unified ArrowSource struct that uses ArrowFormat to dispatch - Kept separate ArrowFileOpener and ArrowStreamFileOpener implementations - Consolidated all FileSource trait implementations in ArrowSource - Format-specific behavior in repartitioned() method (Stream returns None) This consolidation reduces code duplication while maintaining clear separation of concerns between the file and stream format handling. Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit moves statistics handling from individual FileSource implementations into FileScanConfig, simplifying the FileSource trait. Changes: - Remove statistics() and with_statistics() methods from FileSource trait - Remove with_projection() method from FileSource trait (statistics PR only) - Add statistics field to FileScanConfig struct - Add statistics() method to FileScanConfig to retrieve statistics - Update FileScanConfigBuilder to properly handle statistics - Remove projected_statistics field from all FileSource implementations: - ParquetSource - CsvSource - JsonSource - AvroSource - ArrowFileSource and ArrowStreamFileSource - MockSource (test utility) - Update test utilities and assertions to use config.statistics() instead of file_source.statistics() - Update proto serialization to use config.statistics() Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
|
To make life easier for reviewers I've opened a couple smaller hopefully "no brainer" PRs to break work out of this one: |
## Summary This PR consolidates the separate `ArrowFileSource` and `ArrowStreamFileSource` implementations into a unified `ArrowSource` with an `ArrowFormat` enum. This is part of the larger projection refactoring effort tracked in #18627. ## Key Changes - **Removed separate structs**: Eliminated duplicate `ArrowFileSource` and `ArrowStreamFileSource` implementations - **Added `ArrowFormat` enum**: Simple enum with `File` and `Stream` variants to distinguish between Arrow IPC formats - **Unified `ArrowSource` struct**: Single struct that uses `ArrowFormat` to dispatch to appropriate opener - **Kept separate openers**: `ArrowFileOpener` and `ArrowStreamFileOpener` remain distinct as their implementations differ significantly - **Format-specific behavior**: `repartitioned()` method returns `None` for Stream format (doesn't support parallel reading) and delegates to default logic for File format ## Benefits - **Reduced code duplication**: ~144 net lines removed - **Clearer architecture**: Single source of truth for Arrow file handling - **Maintained separation**: Format-specific logic remains in separate openers - **No behavior changes**: All existing tests pass without modification ## Testing - All existing tests pass - No changes to test files needed - Both file and stream formats work correctly ## Related Work This PR is independent and can be merged before or after: - PR 1: Move Statistics Handling (if created) - PR 3: Enhance Physical-Expr Projection Handling (if created) Part of #18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude <noreply@anthropic.com>
3997572 to
b005dfd
Compare
Part of #14993
This moves ownership of projections from
FileScanConfigintoFileSource.Notably we do not do anything special with this in Parquet just yet: I leave it for a followup to actually use the projection expressions instead of column indices to e.g. generate the Parquet
ProjectionMaskdirectly from expressions (in particular to select leaves instead of roots for struct and variant access).