Skip to content

Conversation

@rdettai
Copy link
Contributor

@rdettai rdettai commented Oct 18, 2021

Which issue does this PR close?

Closes #1139.

Rationale for this change

Adding capability to parse file partitioning and prune unnecessary files

What changes are included in this PR?

  • add partition_values: Vec<ScalarValue> to PartitionedFile
  • implement pruned_partition_list
  • add extra column for the partition dimensions to execute() record batch result in file format execution plans
    • avro/csv/json
    • parquet
  • add the proper TableProviderFilterPushDown value to supports_filter_pushdown() to avoid re-evaluation of the partition pruning [1]]

What changes are planned in further issues/PRs?

Are there any user-facing changes?

Rename the ListingOptions.partitions to ListingOptions.table_partition_cols to make it a bit more explicit.


[1] re-evaluating the filters on the the partition column would be expensive:

  • it requires the the column to be pushed down, thus materialized in the source execution plan. This is acceptable if we use DictionaryArray<uint8> which is pretty cheap.
  • when applying the filtering expression the dictionary needs to be expanded because many kernel ops are not supported on Dictionaries for now (link and Add better and faster support for dictionary types #87). This could be very expensive!!!!

@rdettai rdettai changed the title [feat] adding partition_values to PartitionedFile File partitioning for ListingTable Oct 18, 2021
@alamb
Copy link
Contributor

alamb commented Oct 18, 2021

I plan to check this out carefully tomorrow

@rdettai
Copy link
Contributor Author

rdettai commented Oct 19, 2021

Thanks! You can take a quick look, but this will not be ready before 1 or 2 days... If you want, I can ping you once it's in a more reviewable state 😃

@alamb
Copy link
Contributor

alamb commented Oct 19, 2021

Thanks! You can take a quick look, but this will not be ready before 1 or 2 days... If you want, I can ping you once it's in a more reviewable state 😃

Sounds good. Thank you 🙏

@rdettai rdettai force-pushed the file-partitioning branch 2 times, most recently from b1a9db2 to 34df752 Compare October 22, 2021 16:49
@rdettai
Copy link
Contributor Author

rdettai commented Oct 22, 2021

This is a bit harder than I thought it would be 😅. I will have to keep working on this next week. Feel free to give some feedback already, most of the important elements are already there.

/// The minimum number of records required from this source plan
pub limit: Option<usize>,
/// The partitioning column names
pub table_partition_dims: Vec<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why not name it as table_partition_cols to better align with the comment? the type should make it clear that it's storing column names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, I have to admit that I am in a huge hesitation regarding naming 😅. I am wondering if it's not the comment that should be changed. This partitions are originally encoded in the file path, that we then parse and project into a column if necessary. So they end up as columns, but they are not columns per say.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually they are handled as "virtual columns" during compute right? for example, when a user is writing a SQL query to filter against a partition, they will apply the filter to that partition just like other regular columns. I am suggesting partition column here because it's the term used in hive and spark, so readers would be more familiar with it. Are there systems that use partition dimensions as the naming convention?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not that I am aware of, I'll change this to cols 😉

} else {
let applicable_filters = filters
.iter()
.filter(|f| expr_applicable_for_cols(table_partition_dims, f));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid the complexity of expr_applicable_for_cols, perhaps we could just throw a runtime error to the user if invalid filters are provided? this also makes sure that if there is a typo in the filter, we won't silent the error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we are silencing any error here. Typos in the query were already caught (or should already be caught) upstream in the SQL parser with checks like https://github.com/apache/arrow-datafusion/blob/fe1a9e2c55392b934c85098430f78a26ef71380e/datafusion/src/sql/planner.rs#L769-L775

Here expr_applicable_for_cols is different, it only checks whether a given expression can be resolved using the partitioning dimensions only. For instance, if the table partitioning is of type mytable/date=2021-01-01/file.csv with a file schema of the form Schema([Field("a",int), Field("b",string)]). A filter that contains WHERE b='hello' or WHERE b=date is perfectly valid, but the filter should not be kept in the list of applicable_filters because they cannot be resolved with the partitioning column only.

@houqp
Copy link
Member

houqp commented Oct 25, 2021

Epic work @rdettai !

when applying the filtering expression the dictionary needs to be expanded because many kernel ops are not supported on Dictionaries for now

I think we won't need to worry too much about this for this particular use-case because it won't be a problem anymore after we move to use scalar columnar value to store the partition values.

@alamb
Copy link
Contributor

alamb commented Oct 25, 2021

I plan to review this PR tomorrow (when I am fresher and can git it the look it deserves)

@alamb
Copy link
Contributor

alamb commented Oct 26, 2021

I plan to review this PR tomorrow (when I am fresher and can git it the look it deserves)

@rdettai mentioned he has some more work planned for this PR so I will postpone additional review until that is complete

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Epic work. I got through almost all of this PR but I need to finish up helpers.rs -- but I ran out of time today; Will finish tomorrow

I did leave some comment / feedback but I don't think any of it is absolutely required to merge

}

message ParquetScanExecNode {
message FileScanExecConf {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me

.collect(),
batch_size: exec.batch_size() as u32,
base_conf: Some(exec.base_config().try_into()?),
// TODO serialize predicates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a TODO you plan for this PR? Or a follow on one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already there.

projection: conf
.projection
.as_ref()
.unwrap_or(&vec![])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like an improvement to me, but previously the code would error if projection: None was passed and this code will simply convert that to an empty list.

Is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None is encoded with empty vec. Not the cleanest but works here as projection with no column could not have another meaning.

async fn create_physical_plan(
&self,
conf: PhysicalPlanConfig,
_filters: &[Expr],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is neat that the creation of the physical plan gets the filters

)
.await;

let result = ctx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very cool

Comment on lines +55 to +56
/// Note that only `DEFAULT_PARTITION_COLUMN_DATATYPE` is currently
/// supported for the column type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned elsewhere I think it would be fine to say "these are always dictionary coded string columns" rather than "currently" which hints at changing it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this in the other #1141 (comment)

filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Inexact)
if expr_applicable_for_cols(&self.options.table_partition_cols, filter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

rdettai added a commit to rdettai/arrow-datafusion that referenced this pull request Oct 29, 2021
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is great and ready to go. Any other thoughts @houqp or @Dandandan ?


lazy_static! {
/// The datatype used for all partitioning columns for now
pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType = DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- so we are envisioning some how allowing users of this code to specify the type in the partitioning somehow (and provide their own code to determine the partition values). That makes sense

}

impl ExpressionVisitor for ApplicabilityVisitor<'_> {
fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

/// convert the paths of the files to a record batch with the following columns:
/// - one column for the file size named `_df_part_file_size_`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a clever way to apply filtering -- convert the paths to batches and then run the evaluation on the batches, and then turn it back to paths 👍

assert_eq!(&parsed_files[0].partition_values, &[]);
assert_eq!(&parsed_files[1].partition_values, &[]);

let parsed_metas = parsed_files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb mentioned this pull request Nov 1, 2021
@alamb
Copy link
Contributor

alamb commented Nov 1, 2021

I plan to merge from master and if all the tests pass put this one in. FYI @houqp / @Dandandan @jimexist

Please let me know if you want more time to review

@alamb
Copy link
Contributor

alamb commented Nov 1, 2021

FYI fixed a logical conflict in 5d34be6

repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
uint32 batch_size = 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this be back-compatible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as you don't have DataFusion nodes with different versions, it should be ok!


/// The path and size of the file.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it is really useful to add a trait that is not used 😊


use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

pub fn aggr_test_schema() -> SchemaRef {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn aggr_test_schema() -> SchemaRef {
pub(crate) fn aggr_test_schema() -> SchemaRef {

Copy link
Contributor Author

@rdettai rdettai Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests module is enabled for tests only anyway, so the (crate) modifier does not have much effects here (you would need to build DataFusion in test mode to use it).

Co-authored-by: Jiayu Liu <Jimexist@users.noreply.github.com>
@alamb
Copy link
Contributor

alamb commented Nov 1, 2021

As this has been outstanding for a long time and is a fairly large change, the potential for conflict is large -- I am going to merge it in now and hopefully we can keep iterating in future PRs. Thanks again @rdettai @jimexist @houqp and @Dandandan -- 🚀

@alamb alamb merged commit b2a5028 into apache:master Nov 1, 2021
@Dandandan
Copy link
Contributor

Thank you @rdettai ! Really nice work

@houqp houqp added enhancement New feature or request api change Changes the API exposed to users of the crate labels Nov 6, 2021
H0TB0X420 pushed a commit to H0TB0X420/datafusion that referenced this pull request Oct 7, 2025
…pport (apache#1141)

* fix: enhance error handling in async wait_for_future function

* feat: implement async execution for execution plans in PySessionContext

* fix: improve error message for execution failures in PySessionContext

* fix: enhance error handling and improve execution plan retrieval in PyDataFrame

* fix: ensure 'static lifetime for futures in wait_for_future function

* fix: handle potential errors when caching DataFrame and retrieving execution plan

* fix: flatten batches in PyDataFrame to ensure proper schema conversion

* fix: correct error handling in batch processing for schema conversion

* fix: flatten nested structure in PyDataFrame to ensure proper RecordBatch iteration

* fix: improve error handling in PyDataFrame stream execution

* fix: add utility to get Tokio Runtime with time enabled and update wait_for_future to use it

* fix: store result of converting RecordBatches to PyArrow for debugging

* fix: handle error from wait_for_future in PyDataFrame collect method

* fix: propagate error from wait_for_future in collect_partitioned method

* fix: enable IO in Tokio runtime with time support

* main  register_listing_table

* Revert "main  register_listing_table"

This reverts commit 52a5efe2001455a3ad881968d468e5c7538e1ced.

* fix: propagate error correctly from wait_for_future in PySessionContext methods

* fix: simplify error handling in PySessionContext by unwrapping wait_for_future result

* test: add interruption handling test for long-running queries in DataFusion

* test: move test_collect_interrupted to test_dataframe.py

* fix: add const for interval in wait_for_future utility

* fix: use get_tokio_runtime instead of the custom  get_runtime

* Revert "fix: use get_tokio_runtime instead of the custom  get_runtime"

This reverts commit ca2d89289d0a702bbb38f34e88fb78ad61d20647.

* fix: use get_tokio_runtime instead of the custom  get_runtime

* .

* Revert "."

This reverts commit b8ce3e446b74aac7a76f1cc8ce6501b453d4f13c.

* fix: improve query interruption handling in test_collect_interrupted

* fix: ensure proper handling of query interruption in test_collect_interrupted

* fix: improve error handling in database table retrieval

* refactor: add helper for async move

* Revert "refactor: add helper for async move"

This reverts commit faabf6dd90ac505934e7cb6dc3b69fddbe89e661.

* move py_err_to_datafusion_err to errors.rs

* add create_csv_read_options

* fix

* create_csv_read_options -> PyDataFusionResult

* revert to before create_csv_read_options

* refactor: simplify file compression type parsing in PySessionContext

* fix: parse_compression_type once only

* add create_ndjson_read_options

* refactor comment for clarity in wait_for_future function

* refactor wait_for_future to avoid spawn

* remove unused py_err_to_datafusion_err function

* add comment to clarify error handling in next method of PyRecordBatchStream

* handle error from wait_for_future in PySubstraitSerializer

* clarify comment on future pinning in wait_for_future function

* refactor wait_for_future to use Duration for signal check interval

* handle error from wait_for_future in count method of PyDataFrame

* fix ruff errors

* fix clippy errors

* remove unused get_and_enter_tokio_runtime function and simplify wait_for_future

* Refactor async handling in PySessionContext and PyDataFrame

- Simplified async handling by removing unnecessary cloning of strings and context in various methods.
- Streamlined the use of `wait_for_future` to directly handle futures without intermediate variables.
- Improved error handling by directly propagating results from futures.
- Enhanced readability by reducing boilerplate code in methods related to reading and writing data.
- Updated the `wait_for_future` function to improve signal checking and future handling.

* Organize imports in utils.rs for improved readability

* map_err instead of panic

* Fix error handling in async stream execution for PySessionContext and PyDataFrame
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api change Changes the API exposed to users of the crate enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement partitioned read in listing table provider

5 participants