-
Couldn't load subscription status.
- Fork 1.7k
File partitioning for ListingTable #1141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I plan to check this out carefully tomorrow |
|
Thanks! You can take a quick look, but this will not be ready before 1 or 2 days... If you want, I can ping you once it's in a more reviewable state 😃 |
Sounds good. Thank you 🙏 |
b1a9db2 to
34df752
Compare
|
This is a bit harder than I thought it would be 😅. I will have to keep working on this next week. Feel free to give some feedback already, most of the important elements are already there. |
34df752 to
4ef66aa
Compare
| /// The minimum number of records required from this source plan | ||
| pub limit: Option<usize>, | ||
| /// The partitioning column names | ||
| pub table_partition_dims: Vec<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious why not name it as table_partition_cols to better align with the comment? the type should make it clear that it's storing column names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, I have to admit that I am in a huge hesitation regarding naming 😅. I am wondering if it's not the comment that should be changed. This partitions are originally encoded in the file path, that we then parse and project into a column if necessary. So they end up as columns, but they are not columns per say.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conceptually they are handled as "virtual columns" during compute right? for example, when a user is writing a SQL query to filter against a partition, they will apply the filter to that partition just like other regular columns. I am suggesting partition column here because it's the term used in hive and spark, so readers would be more familiar with it. Are there systems that use partition dimensions as the naming convention?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not that I am aware of, I'll change this to cols 😉
| } else { | ||
| let applicable_filters = filters | ||
| .iter() | ||
| .filter(|f| expr_applicable_for_cols(table_partition_dims, f)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to avoid the complexity of expr_applicable_for_cols, perhaps we could just throw a runtime error to the user if invalid filters are provided? this also makes sure that if there is a typo in the filter, we won't silent the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we are silencing any error here. Typos in the query were already caught (or should already be caught) upstream in the SQL parser with checks like https://github.com/apache/arrow-datafusion/blob/fe1a9e2c55392b934c85098430f78a26ef71380e/datafusion/src/sql/planner.rs#L769-L775
Here expr_applicable_for_cols is different, it only checks whether a given expression can be resolved using the partitioning dimensions only. For instance, if the table partitioning is of type mytable/date=2021-01-01/file.csv with a file schema of the form Schema([Field("a",int), Field("b",string)]). A filter that contains WHERE b='hello' or WHERE b=date is perfectly valid, but the filter should not be kept in the list of applicable_filters because they cannot be resolved with the partitioning column only.
|
Epic work @rdettai !
I think we won't need to worry too much about this for this particular use-case because it won't be a problem anymore after we move to use scalar columnar value to store the partition values. |
|
I plan to review this PR tomorrow (when I am fresher and can git it the look it deserves) |
@rdettai mentioned he has some more work planned for this PR so I will postpone additional review until that is complete |
5c189dc to
b3b32d4
Compare
parquet exec still TODO
This helps avoid providing schemas with wrong datatypes.
refactored partition column projection and added test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Epic work. I got through almost all of this PR but I need to finish up helpers.rs -- but I ran out of time today; Will finish tomorrow
I did leave some comment / feedback but I don't think any of it is absolutely required to merge
| } | ||
|
|
||
| message ParquetScanExecNode { | ||
| message FileScanExecConf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me
| .collect(), | ||
| batch_size: exec.batch_size() as u32, | ||
| base_conf: Some(exec.base_config().try_into()?), | ||
| // TODO serialize predicates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a TODO you plan for this PR? Or a follow on one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was already there.
| projection: conf | ||
| .projection | ||
| .as_ref() | ||
| .unwrap_or(&vec![]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like an improvement to me, but previously the code would error if projection: None was passed and this code will simply convert that to an empty list.
Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None is encoded with empty vec. Not the cleanest but works here as projection with no column could not have another meaning.
| async fn create_physical_plan( | ||
| &self, | ||
| conf: PhysicalPlanConfig, | ||
| _filters: &[Expr], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is neat that the creation of the physical plan gets the filters
| ) | ||
| .await; | ||
|
|
||
| let result = ctx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very cool
| /// Note that only `DEFAULT_PARTITION_COLUMN_DATATYPE` is currently | ||
| /// supported for the column type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned elsewhere I think it would be fine to say "these are always dictionary coded string columns" rather than "currently" which hints at changing it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss this in the other #1141 (comment)
| filter: &Expr, | ||
| ) -> Result<TableProviderFilterPushDown> { | ||
| Ok(TableProviderFilterPushDown::Inexact) | ||
| if expr_applicable_for_cols(&self.options.table_partition_cols, filter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
d3cc283 to
cb0789e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is great and ready to go. Any other thoughts @houqp or @Dandandan ?
|
|
||
| lazy_static! { | ||
| /// The datatype used for all partitioning columns for now | ||
| pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType = DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see -- so we are envisioning some how allowing users of this code to specify the type in the partitioning somehow (and provide their own code to determine the partition values). That makes sense
| } | ||
|
|
||
| impl ExpressionVisitor for ApplicabilityVisitor<'_> { | ||
| fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| } | ||
|
|
||
| /// convert the paths of the files to a record batch with the following columns: | ||
| /// - one column for the file size named `_df_part_file_size_` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a clever way to apply filtering -- convert the paths to batches and then run the evaluation on the batches, and then turn it back to paths 👍
| assert_eq!(&parsed_files[0].partition_values, &[]); | ||
| assert_eq!(&parsed_files[1].partition_values, &[]); | ||
|
|
||
| let parsed_metas = parsed_files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
I plan to merge from master and if all the tests pass put this one in. FYI @houqp / @Dandandan @jimexist Please let me know if you want more time to review |
|
FYI fixed a logical conflict in 5d34be6 |
| repeated uint32 projection = 6; | ||
| ScanLimit limit = 7; | ||
| Statistics statistics = 8; | ||
| uint32 batch_size = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would this be back-compatible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as you don't have DataFusion nodes with different versions, it should be ok!
|
|
||
| /// The path and size of the file. | ||
| #[derive(Debug, Clone)] | ||
| #[derive(Debug, Clone, PartialEq)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| #[derive(Debug, Clone, PartialEq)] | |
| #[derive(Debug, Clone, PartialEq, Eq)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it is really useful to add a trait that is not used 😊
|
|
||
| use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; | ||
|
|
||
| pub fn aggr_test_schema() -> SchemaRef { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| pub fn aggr_test_schema() -> SchemaRef { | |
| pub(crate) fn aggr_test_schema() -> SchemaRef { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests module is enabled for tests only anyway, so the (crate) modifier does not have much effects here (you would need to build DataFusion in test mode to use it).
Co-authored-by: Jiayu Liu <Jimexist@users.noreply.github.com>
|
As this has been outstanding for a long time and is a fairly large change, the potential for conflict is large -- I am going to merge it in now and hopefully we can keep iterating in future PRs. Thanks again @rdettai @jimexist @houqp and @Dandandan -- 🚀 |
|
Thank you @rdettai ! Really nice work |
…pport (apache#1141) * fix: enhance error handling in async wait_for_future function * feat: implement async execution for execution plans in PySessionContext * fix: improve error message for execution failures in PySessionContext * fix: enhance error handling and improve execution plan retrieval in PyDataFrame * fix: ensure 'static lifetime for futures in wait_for_future function * fix: handle potential errors when caching DataFrame and retrieving execution plan * fix: flatten batches in PyDataFrame to ensure proper schema conversion * fix: correct error handling in batch processing for schema conversion * fix: flatten nested structure in PyDataFrame to ensure proper RecordBatch iteration * fix: improve error handling in PyDataFrame stream execution * fix: add utility to get Tokio Runtime with time enabled and update wait_for_future to use it * fix: store result of converting RecordBatches to PyArrow for debugging * fix: handle error from wait_for_future in PyDataFrame collect method * fix: propagate error from wait_for_future in collect_partitioned method * fix: enable IO in Tokio runtime with time support * main register_listing_table * Revert "main register_listing_table" This reverts commit 52a5efe2001455a3ad881968d468e5c7538e1ced. * fix: propagate error correctly from wait_for_future in PySessionContext methods * fix: simplify error handling in PySessionContext by unwrapping wait_for_future result * test: add interruption handling test for long-running queries in DataFusion * test: move test_collect_interrupted to test_dataframe.py * fix: add const for interval in wait_for_future utility * fix: use get_tokio_runtime instead of the custom get_runtime * Revert "fix: use get_tokio_runtime instead of the custom get_runtime" This reverts commit ca2d89289d0a702bbb38f34e88fb78ad61d20647. * fix: use get_tokio_runtime instead of the custom get_runtime * . * Revert "." This reverts commit b8ce3e446b74aac7a76f1cc8ce6501b453d4f13c. * fix: improve query interruption handling in test_collect_interrupted * fix: ensure proper handling of query interruption in test_collect_interrupted * fix: improve error handling in database table retrieval * refactor: add helper for async move * Revert "refactor: add helper for async move" This reverts commit faabf6dd90ac505934e7cb6dc3b69fddbe89e661. * move py_err_to_datafusion_err to errors.rs * add create_csv_read_options * fix * create_csv_read_options -> PyDataFusionResult * revert to before create_csv_read_options * refactor: simplify file compression type parsing in PySessionContext * fix: parse_compression_type once only * add create_ndjson_read_options * refactor comment for clarity in wait_for_future function * refactor wait_for_future to avoid spawn * remove unused py_err_to_datafusion_err function * add comment to clarify error handling in next method of PyRecordBatchStream * handle error from wait_for_future in PySubstraitSerializer * clarify comment on future pinning in wait_for_future function * refactor wait_for_future to use Duration for signal check interval * handle error from wait_for_future in count method of PyDataFrame * fix ruff errors * fix clippy errors * remove unused get_and_enter_tokio_runtime function and simplify wait_for_future * Refactor async handling in PySessionContext and PyDataFrame - Simplified async handling by removing unnecessary cloning of strings and context in various methods. - Streamlined the use of `wait_for_future` to directly handle futures without intermediate variables. - Improved error handling by directly propagating results from futures. - Enhanced readability by reducing boilerplate code in methods related to reading and writing data. - Updated the `wait_for_future` function to improve signal checking and future handling. * Organize imports in utils.rs for improved readability * map_err instead of panic * Fix error handling in async stream execution for PySessionContext and PyDataFrame
Which issue does this PR close?
Closes #1139.
Rationale for this change
Adding capability to parse file partitioning and prune unnecessary files
What changes are included in this PR?
partition_values: Vec<ScalarValue>toPartitionedFileexecute()record batch result in file format execution plansTableProviderFilterPushDownvalue tosupports_filter_pushdown()to avoid re-evaluation of the partition pruning [1]]What changes are planned in further issues/PRs?
Are there any user-facing changes?
Rename the
ListingOptions.partitionstoListingOptions.table_partition_colsto make it a bit more explicit.[1] re-evaluating the filters on the the partition column would be expensive:
DictionaryArray<uint8>which is pretty cheap.