-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is your feature request related to a problem or challenge?
I've been working on implementing a new ColumnOrder for floating point columns in Parquet (apache/arrow-rs#7408), and while investigating how to use the new statistics in Datafusion, I found an interesting quirk.
I'm looking at explain plans for some queries that should be able to use statistics to prune pages. For example:
> explain select int_col from 'parquet-testing/data/alltypes_tiny_pages.parquet' where int_col > 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Filter: parquet-testing/data/alltypes_tiny_pages.parquet.int_col > Int32(10) |
| | TableScan: parquet-testing/data/alltypes_tiny_pages.parquet projection=[int_col], partial_filters=[parquet-testing/data/alltypes_tiny_pages.parquet.int_col > Int32(10)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: int_col@0 > 10 |
| | RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 |
| | DataSourceExec: file_groups={1 group: [[Users/seidl/src/datafusion/parquet-testing/data/alltypes_tiny_pages.parquet]]}, projection=[int_col], file_type=parquet, predicate=int_col@4 > 10, pruning_predicate=int_col_null_count@1 != row_count@2 AND int_col_max@0 > 10, required_guarantees=[] |will use the column and page statistics (pruning_predicate is populated). However, when I tried a similar plan for a float column, I was surprised to see that no pruning is done.
> explain select float_col from 'parquet-testing/data/alltypes_tiny_pages.parquet' where float_col > 10.0;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Filter: CAST(parquet-testing/data/alltypes_tiny_pages.parquet.float_col AS Float64) > Float64(10) |
| | TableScan: parquet-testing/data/alltypes_tiny_pages.parquet projection=[float_col], partial_filters=[CAST(parquet-testing/data/alltypes_tiny_pages.parquet.float_col AS Float64) > Float64(10)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: CAST(float_col@0 AS Float64) > 10 |
| | RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 |
| | DataSourceExec: file_groups={1 group: [[Users/seidl/src/datafusion/parquet-testing/data/alltypes_tiny_pages.parquet]]}, projection=[float_col], file_type=parquet, predicate=CAST(float_col@6 AS Float64) > 10 |
| | But, using a double column will:
> explain select double_col from 'parquet-testing/data/alltypes_tiny_pages.parquet' where double_col > 10.0;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Filter: parquet-testing/data/alltypes_tiny_pages.parquet.double_col > Float64(10) |
| | TableScan: parquet-testing/data/alltypes_tiny_pages.parquet projection=[double_col], partial_filters=[parquet-testing/data/alltypes_tiny_pages.parquet.double_col > Float64(10)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: double_col@0 > 10 |
| | RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 |
| | DataSourceExec: file_groups={1 group: [[Users/seidl/src/datafusion/parquet-testing/data/alltypes_tiny_pages.parquet]]}, projection=[double_col], file_type=parquet, predicate=double_col@7 > 10, pruning_predicate=double_col_null_count@1 != row_count@2 AND double_col_max@0 > 10, required_guarantees=[] |Digging into the code, it seems the problem with float lies in a) the literal 10.0 is treated as a double so b) float_col is cast to Float64, which is disallowed in datafusion_physical_optimizer::pruning::verify_support_type_for_prune().
Interestingly, using an int literal works:
> explain select float_col from 'parquet-testing/data/alltypes_tiny_pages.parquet' where float_col > 10;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Filter: parquet-testing/data/alltypes_tiny_pages.parquet.float_col > Float32(10) |
| | TableScan: parquet-testing/data/alltypes_tiny_pages.parquet projection=[float_col], partial_filters=[parquet-testing/data/alltypes_tiny_pages.parquet.float_col > Float32(10)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: float_col@0 > 10 |
| | RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 |
| | DataSourceExec: file_groups={1 group: [[Users/seidl/src/datafusion/parquet-testing/data/alltypes_tiny_pages.parquet]]}, projection=[float_col], file_type=parquet, predicate=float_col@6 > 10, pruning_predicate=float_col_null_count@1 != row_count@2 AND float_col_max@0 > 10, required_guarantees=[] |as does an explicit cast
> explain select float_col from 'parquet-testing/data/alltypes_tiny_pages.parquet' where float_col > cast(10.0 as float);
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Filter: parquet-testing/data/alltypes_tiny_pages.parquet.float_col > Float32(10) |
| | TableScan: parquet-testing/data/alltypes_tiny_pages.parquet projection=[float_col], partial_filters=[parquet-testing/data/alltypes_tiny_pages.parquet.float_col > Float32(10)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: float_col@0 > 10 |
| | RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 |
| | DataSourceExec: file_groups={1 group: [[Users/seidl/src/datafusion/parquet-testing/data/alltypes_tiny_pages.parquet]]}, projection=[float_col], file_type=parquet, predicate=float_col@6 > 10, pruning_predicate=float_col_null_count@1 != row_count@2 AND float_col_max@0 > 10, required_guarantees=[] |Describe the solution you'd like
It would be nice if Datafusion always used statistics for floating point columns if they are available. One potential fix is to add more cases to verify_support_type_for_prune (
datafusion/datafusion/physical-optimizer/src/pruning.rs
Lines 1217 to 1229 in 42a45d1
| fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> { | |
| // TODO: support other data type for prunable cast or try cast | |
| if matches!( | |
| from_type, | |
| DataType::Int8 | |
| | DataType::Int16 | |
| | DataType::Int32 | |
| | DataType::Int64 | |
| | DataType::Decimal128(_, _) | |
| ) && matches!( | |
| to_type, | |
| DataType::Int8 | DataType::Int32 | DataType::Int64 | DataType::Decimal128(_, _) | |
| ) { |
int literals to float). I'd be happy to attempt the former, but I'd need pointing to where to attempt the latter 😅.
Describe alternatives you've considered
No response
Additional context
No response