Skip to content

Conversation

@2010YOUY01
Copy link
Contributor

Which issue does this PR close?

  • Closes #.

Rationale for this change

Background for dynamic filter: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/

The following queries can be used for quick global insights:

-- Q1
select min(l_shipdate) from lineitem;
-- Q2
select min(l_shipdate) from lineitem where l_returnflag = 'R';

Now Q1 can get executed very efficiently by directly check the file metadata if possible:

> explain select min(l_shipdate) from lineitem;
+---------------+-------------------------------+
| plan_type     | plan                          |
+---------------+-------------------------------+
| physical_plan | ┌───────────────────────────┐ |
|               | │       ProjectionExec      │ |
|               | │    --------------------   │ |
|               | │ min(lineitem.l_shipdate): │ |
|               | │         1992-01-02        │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │     PlaceholderRowExec    │ |
|               | └───────────────────────────┘ |
|               |                               |
+---------------+-------------------------------+
1 row(s) fetched.
Elapsed 0.007 seconds.

However for Q2 now it's still doing the whole scan, and it's possible to use dynamic filters to speed them up.

Benchmarking Q2

Setup

  1. Generate tpch-sf100 parquet file with tpchgen-cli -s 100 --format=parquet (https://github.com/clflushopt/tpchgen-rs/tree/main/tpchgen-cli)
  2. In datafusion-cli, run
CREATE EXTERNAL TABLE lineitem
STORED AS PARQUET
LOCATION '/Users/yongting/data/tpch_sf100/lineitem.parquet';

select min(l_shipdate) from lineitem where l_returnflag = 'R';

Result

Main: 0.55s
PR: 0.09s

Rationale

/// # Overview
///
/// For queries like
///   -- `example_table(type TEXT, val INT)`
///   SELECT min(val)
///   FROM example_table
///   WHERE type='A';
///
/// And `example_table`'s physical representation is a partitioned parquet file with
/// column statistics
/// - part-0.parquet: val {min=0, max=100}
/// - part-1.parquet: val {min=100, max=200}
/// - ...
/// - part-100.parquet: val {min=10000, max=10100}
///
/// After scanning the 1st file, we know we only have to read files if their minimal
/// value on `val` column is less than 0, the minimal `val` value in the 1st file.
///
/// We can skip scanning the remaining file by implementing dynamic filter, the
/// intuition is we keep a shared data structure for current min in both `AggregateExec
/// and `DataSourceExec`, and let it update during execution, so the scanner can
/// know during execution if it's possible to skip scanning certain files. See
/// physical optimizer rule `FilterPushdown` for details.
///
/// # Implementation
/// ## Enable Condition
/// - No grouping (no `GROUP BY` clause in the sql, only a single global group to aggregate)
/// - The aggregate expression must be `min`/`max`, and evaluate directly on columns.
///   Note multiple aggregate expressions that satisfy this requirement are allowed,
///   and a dynamic filter will be constructed combining all applicable expr's
///   states. See more in the following example with dynamic filter on multiple columns.
/// ## Filter Construction
/// The filter is kept in the `DataSourceExec`, and it will gets update during execution,
/// the reader will interpret it as "the upstream only needs rows that such filter
/// predicate is evaluated to true", and certain scanner implementation like `parquet`
/// can evalaute column statistics on those dynamic filters, to decide if they can
/// prune a whole range.
///
/// ### Examples
/// - Expr: `min(a)`, Dynmaic Filter: `a < a_cur_min`
/// - Expr: `min(a), max(a), min(b)`, Dynamic Filter: `(a < a_cur_min) OR (a > a_cur_max) OR (b < b_cur_min)`

What changes are included in this PR?

This PR has not finish yet, the above demo is working, but I plan to do cleanups and tests afterwards, now I would like to get some early feedbacks on the PoC.

The goal is is to let aggregate expressions MIN/MAX with only column reference as argument (e.g. min(col1)) support dynamic filter, the above implementation rationale has explained it further.

The implementation includes:

  1. Added AggrDynFilter struct, and it would be shared across different partition streams to store the current bounds for dynamic filter update.
  2. init_dynamic_filter is responsible checking the conditions for whether to enable dynamic filter in the current aggregate execution plan, and finally build the AggrDynFilter inside the operator.
  3. During aggregation execution, after evaluating each batch, the current bound is refreshed in the dynamic filter, enabling the scanner to skip prunable units using the latest runtime bounds. (now it's updating every batch, perhaps we can let them update every k batches to avoid overheads?)
  4. Updated gather_filters_for_pushdown and handle_child_pushdown_result API in AggregateExec to enable self dynamic filter generation and pushdown.

TODOs:

Add a configuration to turn it on and off
Comprehensive tests
Cleanup and more comments

Are these changes tested?

Not yet

Are there any user-facing changes?

No

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Nov 10, 2025
@2010YOUY01 2010YOUY01 changed the title POC: Support dynamic filter min/max aggregates POC: Support dynamic filter in MIN/MAX aggregates Nov 10, 2025
@alamb
Copy link
Contributor

alamb commented Nov 10, 2025

This is a pretty clever idea

@2010YOUY01
Copy link
Contributor Author

Moved to a review-ready version #18644

@2010YOUY01 2010YOUY01 closed this Nov 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants