-
Notifications
You must be signed in to change notification settings - Fork 1.8k
optimizer: Support dynamic filter in MIN/MAX aggregates
#18644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| })?; | ||
| // First get current partition's bound, then update the shared bound among | ||
| // all partitions. | ||
| let current_bound = acc.evaluate()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let current_bound = acc.evaluate()?; | |
| let current_bound = acc.evaluate()?; | |
| if current_bound.is_null() { | |
| continue; | |
| } |
?!
because it will affect the scalar_min() below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Additionally, maybe we should only update the shared bound if the local bound has tighten, to reduce lock contention.
| /// During filter pushdown optimization, if a child node can accept this filter, | ||
| /// it remains `Some(..)` to enable dynamic filtering during aggregate execution; | ||
| /// otherwise, it is cleared to `None`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that currently "does the child node accept the filter" is a bit murky: even if it is says No it can still retain a reference e.g. for statistics pruning.
It seems to me we may need to expand the pushdown response from Yes/No to Exact/Inexact/Unsupported.
Or maybe we should check the Arc reference counts 😛? If no one else has a reference... no point in updating?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Also, what's the current semantics? Is Yes map to either Exact or InExact?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Precisely: Yes can mean Exact or Inexact but doesn’t differentiate between them
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super cool @2010YOUY01 ! I hadn't even thought of this use case. Truly amazing.
I left a small comment for now. Overall the change looks good but requires more in depth review. I'll try over the next couple days but am on vacation so it may take a week 🙏🏻
Thanks! Enjoy your vacation. 😄 |
Which issue does this PR close?
Rationale for this change
Background for dynamic filter: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/
The following queries can be used for quick global insights:
Now Q1 can get executed very efficiently by directly check the file metadata if possible:
However for Q2 now it's still doing the whole scan, and it's possible to use dynamic filters to speed them up.
Benchmarking Q2
Setup
tpchgen-cli -s 100 --format=parquet(https://github.com/clflushopt/tpchgen-rs/tree/main/tpchgen-cli)Result
Main: 0.55s
PR: 0.09s
Aggregate Dynamic Filter Pushdown Overview
For queries like
And
example_table's physical representation is a partitioned parquet file withcolumn statistics
After scanning the 1st file, we know we only have to read files if their minimal
value on
valcolumn is less than 0, the minimalvalvalue in the 1st file.We can skip scanning the remaining file by implementing dynamic filter, the
intuition is we keep a shared data structure for current min in both
AggregateExecand
DataSourceExec, and let it update during execution, so the scanner canknow during execution if it's possible to skip scanning certain files. See
physical optimizer rule
FilterPushdownfor details.Implementation
Enable Condition
GROUP BYclause in the sql, only a single global group to aggregate)min/max, and evaluate directly on columns.Note multiple aggregate expressions that satisfy this requirement are allowed,
and a dynamic filter will be constructed combining all applicable expr's
states. See more in the following example with dynamic filter on multiple columns.
Filter Construction
The filter is kept in the
DataSourceExec, and it will gets update during execution,the reader will interpret it as "the upstream only needs rows that such filter
predicate is evaluated to true", and certain scanner implementation like
parquetcan evalaute column statistics on those dynamic filters, to decide if they can
prune a whole range.
Examples
min(a), Dynamic Filter:a < a_cur_minmin(a), max(a), min(b), Dynamic Filter:(a < a_cur_min) OR (a > a_cur_max) OR (b < b_cur_min)What changes are included in this PR?
The goal is is to let aggregate expressions
MIN/MAXwith only column reference as argument (e.g. min(col1)) support dynamic filter, the above implementation rationale has explained it further.The implementation includes:
AggrDynFilterstruct, and it would be shared across different partition streams to store the current bounds for dynamic filter update.init_dynamic_filteris responsible checking the conditions for whether to enable dynamic filter in the current aggregate execution plan, and finally build theAggrDynFilterinside the operator.gather_filters_for_pushdownandhandle_child_pushdown_resultAPI inAggregateExecto enable self dynamic filter generation and pushdown.TODO(in this PR)
Are these changes tested?
Yes, optimize UTs and end-to-end tests
Are there any user-facing changes?
No