-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add dynamic filter (bounds) pushdown to HashJoinExec #16445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I tink we should also consider a heuristic for not evaluating the filter if it's not useful. Also I think doing only the lookup is preferable above also computing / checking the bounds, I think the latter might create more overhead. |
|
Sorry, misclicked a button. |
My thought was that for some cases the bounds checks are going to be quite effective at pruning and they should always be cheap to compute and cheap to apply. I'm surprised you say that they might create a lot of overhead? |
Maybe I should articulate it a bit more.
I think it also makes sense to also thing about a heuristic we want to use to use this pushdown only when we think it might be useful - e.g. the left side is much smaller than the right side, or we know (based on column statistics) it will filter out rows. |
I'm surprised that the hash table lookup, even if O(1), has such a small constant factor that its ~ a couple of binary comparisons. That said a reason to still do both is stats and filter caching: simple filters like |
Datafusion is generally not great at these things: we often don't have enough stats / info to make decisions like this. |
It's hard to say generally, but a hashtable lookup which fits into cache on a |
I guess only benchmarks can tell. But I still think the scalar bounds are worth keeping for stats pruning reasons. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any metrics to record how much data is filtered by dynamic join filter?
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_hashjoin_dynamic_filter_pushdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some tests for multiple joins? Such as
Join (t1.a = t2.b)
/ \
t1 Join(t2.c = t3.d)
/ \
t3 t2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such test can check
- dynamic filters are pushed down to right scan node
- dynamic filters aren't missed during pushdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a test that I think matches your suggestion
49d1636 to
04efcc1
Compare
|
Guess it just doesn't make a difference then. Thanks! |
|
Here it is in action: COPY (
with data as (
select unnest(generate_series(1, 99999999)) as id
)
select
id,
(id / 100000) as partition_id
from data
) TO 'test/t1/' STORED AS parquet PARTITIONED BY (partition_id);
CREATE EXTERNAL TABLE t1 (
id int
)
STORED AS PARQUET
PARTITIONED BY (partition_id int)
LOCATION 'test/t1/';
COPY (
with data as (
select unnest(generate_series(1, 100)) as id
)
select
id,
(id / 100000) as partition_id
from data
) TO 'test/t2/' STORED AS parquet PARTITIONED BY (partition_id);
CREATE EXTERNAL TABLE t2 (
id int
)
STORED AS PARQUET
PARTITIONED BY (partition_id int)
LOCATION 'test/t2/';
SET datafusion.optimizer.enable_dynamic_filter_pushdown = false;
explain analyze
SELECT count(*)
FROM t1
JOIN t2 USING (id);
SET datafusion.optimizer.enable_dynamic_filter_pushdown = true;
explain analyze
SELECT count(*)
FROM t1
JOIN t2 USING (id);
And that's without filter pushdown enabled, only benefiting from stats pruning. Explain plans |
|
Personally I would like to move forward with this and then make another PR to push down a reference to the entire hash table. |
|
Thanks @adriangb -- I will put this on my list of PRs to review more carefully in the next few days |
| // Create build side with limited values | ||
| let build_batches = vec![record_batch!( | ||
| ("a", Utf8, ["aa", "ab"]), | ||
| ("b", Utf8, ["ba", "bb"]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may add some Utf8View fields testing cases, because our default mapping already changing to Utf8View.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
I'll have a look tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests also need to update, others lgtm, thank you @adriangb
|
Fixed @xudong963, thanks so much for your review! I will leave this up until early next week to see if we get any more feedback. |
|
I plan on merging this in a couple hours if there are no objections |
|
Would anyone here like to review #17153 which enables this for left semi joins / |
|
@nuno-faria has reported a bug related to this PR: |
I am not likely to have time, but if I do I will take a loo |
(cherry picked from commit ff77b70)
* Enable physical filter pushdown for hash joins (apache#16954) (cherry picked from commit b10f453) * Add ExecutionPlan::reset_state (apache#17028) * Add ExecutionPlan::reset_state Co-authored-by: Robert Ream <robert@stably.io> * Update datafusion/sqllogictest/test_files/cte.slt * Add reference * fmt * add to upgrade guide * add explain plan, implement in more plans * fmt * only explain --------- Co-authored-by: Robert Ream <robert@stably.io> * Add dynamic filter (bounds) pushdown to HashJoinExec (apache#16445) (cherry picked from commit ff77b70) * Push dynamic pushdown through CooperativeExec and ProjectionExec (apache#17238) (cherry picked from commit 4bc0696) * Fix dynamic filter pushdown in HashJoinExec (apache#17201) (cherry picked from commit 1d4d74b) * Fix HashJoinExec sideways information passing for partitioned queries (apache#17197) (cherry picked from commit 64bc58d) * disallow pushdown of volatile functions (apache#16861) * dissallow pushdown of volatile PhysicalExprs * fix * add FilteredVec helper to handle filter / remap pattern (#34) * checkpoint: Address PR feedback in https://github.com/apach... * add FilteredVec to consolidate handling of filter / remap pattern * lint * Add slt test for pushing volatile predicates down (#35) --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> (cherry picked from commit 94e8548) * fix bounds accumulator reset in HashJoinExec dynamic filter pushdown (apache#17371) --------- Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Co-authored-by: Robert Ream <robert@stably.io> Co-authored-by: Jack Kleeman <jackkleeman@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Penultimate part of #7955.
This implements pushdown of self filters for HashJoinExec, similar to what DuckDB does.
We plan to follow up with pushdown of a reference to the hash table itself which will benefit joins where there is no order to the join condition (e.g. UUIDs).