Skip to content

enable dynamic filtering for file hash partitioned data#20217

Draft
gene-bordegaray wants to merge 2 commits intoapache:mainfrom
gene-bordegaray:gene.bordegaray/2026/02/dyn_filter_with_file_partitioning
Draft

enable dynamic filtering for file hash partitioned data#20217
gene-bordegaray wants to merge 2 commits intoapache:mainfrom
gene-bordegaray:gene.bordegaray/2026/02/dyn_filter_with_file_partitioning

Conversation

@gene-bordegaray
Copy link
Contributor

@gene-bordegaray gene-bordegaray commented Feb 7, 2026

Which issue does this PR close?

Rationale for this change

Dynamic filter were disabled on partitioned hash joins when preserve_file_partitions was true due to #20176 .

Refer to the issue: #20195 for more detail.

What changes are included in this PR?

The approach takes the following steps:

  1. Detection logic: When building a partitioned hash join, check whether both join inputs have a RepartitionExec with Hash partitioning in their plan tree.
  2. Global OR dynamic filter: When in Partitioned mode without RepartitionExec on both sides, we cannot guarantee that hash(key) % N routing will correctly map rows to partitions. In this case, use a "global OR" approach: combine all per-partition filters with OR into a single expression. This is safe for any partitioning scheme and still effective row-group pruning.
(
    filter_0 OR
    filter_1 OR
    ...
)
  1. Case routing preserved: When both join inputs have RepartitionExec(Hash), the data is truly hash-distributed and the original CASE routing (hash(key) % N) is used for optimal per-partition filtering.

Note: The global OR approach is not tied to the preserve_file_partitions config flag. It applies any time there's no RepartitionExec between the DataSourceExec and the join.

Alternative Approach - Partition Index Routing

An alternative approach would be to introduce a PartitionIndex physical expression that routes probe-side rows to the correct partition's filter based on a partition mapping (partition column value -> partition index). This would look like:

  CASE partition_index(partition_col)
    WHEN 0 THEN filter_0
    WHEN 1 THEN filter_1
    ...
  END

Why I didn't chose:

  1. No row-group pruning advantage: For statistics-based row-group pruning the PruningPredicate uses the OR branches independently. This means something like filter_0 OR filter_1 OR filter_2 achieves identical row-group pruning as the partition indexed CASE expression. They both evaluate each filter against the row-group statistics, meaning no perf beenfit.
  2. Complexity:
    • Exposing partition mapping from FileScanConfig implementations
    • New PartitionIndex physical expression
    • Plumbing throughout the dynamic filter system

Why we may want to consider:

  1. Row-level cost is not bad: global OR has O(PartitionCount) evaluation cost per row. For typical partition counts I believe this is fine since the bug wins and most rows are eliminated by the row-group pruning.
  2. This would force awkward code in the FileScanConfig or public API change (I believe)

Are these changes tested?

  • Unit: datafusion/physical-plan/src/joins/hash_join/exec.rs‎ datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
  • Integration: datafusion/core/tests/physical_optimizer/filter_pushdown.rs‎
  • Sql Logic: [datafusion/sqllogictest/test_files/preserve_file_partitioning.slt‎

Are there any user-facing changes?

No

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Feb 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

support dynamic filtering on partitioned data from file source

1 participant