-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pushdown support for predicates in ON
clause of joins
#2647
Conversation
7af4483
to
7e2432f
Compare
7e2432f
to
cbe65ed
Compare
\n TableScan: person projection=None\ | ||
\n Filter: #orders.order_id > Int64(1)\ | ||
\n TableScan: orders projection=None"; | ||
\n TableScan: orders projection=None"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that it can't pushdown filter to TableScan👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it can not anymore - now all predicate movements are on filter_push_down
optimizer, and there is no need for planner to define should it push some filters down or not - it just creates plan node as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to be clear, when run normally the pushdown will still happen, but they are no longer pushed down by the planner, rather they are pushed down at a later stage.
👍
I plan to review this later today or tomorrow if no one else has a chance to do so before |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at this PR carefully -- really nice work @korowa
I have some suggestions that might make the code easier to follow / less brittle to other changes in the future, but i think what you have here is well tested and looks correct to me.
I think the assumptions that the filter_expr is the last expression that comes back from Expr::expressions()
is fine but could potentially be confusing to future readers (I imagine myself in some time looking at this code). Maybe some additional documentation could help here
In any event, thank you so much 🏅
@@ -178,13 +177,35 @@ fn lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) { | |||
} | |||
} | |||
|
|||
// For a given JOIN logical plan, determine whether each side of the join is preserved | |||
// in terms on join filtering. | |||
// Predicates from join filter can only be pushed to preserved join side. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
// Determine which predicates in state can be pushed down to a given side of a join. | ||
// To determine this, we need to know the schema of the relevant join side and whether | ||
// or not the side's rows are preserved when joining. If the side is not preserved, we | ||
// do not push down anything. Otherwise we can push down predicates where all of the | ||
// relevant columns are contained on the relevant join side's schema. | ||
fn get_pushable_join_predicates<'a>( | ||
state: &'a State, | ||
filters: &'a [(Expr, HashSet<Column>)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would help to document what the filters
here represents, either in a doc string ("pairs of Exprs and a set of columns that represent ...") or maybe as a struct?
struct PushFilters {
/// expr represents an On clause predicate
expr: Expr,
/// columns represents columns that appear in `expr`
cols: HashSet<Column>
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added type Predicate
(we already have type Predicates
for similar kind of thing, so uniform naming might be better) and replaced all tuples of expression and its columns in signatures/attributes with it - hope it looks better.
let to_left = get_pushable_join_predicates(&state, left.schema(), left_preserved); | ||
let to_right = get_pushable_join_predicates(&state, right.schema(), right_preserved); | ||
|
||
let to_left = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code is very nicely written and easy to follow 👍
let right = optimize(right, right_state)?; | ||
|
||
// create a new Join with the new `left` and `right` | ||
let expr = plan.expressions(); | ||
let expr = if !on_filter.is_empty() && on_to_keep.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really follow how we know the last element here in expr
are the on
expression -- doesn't that implicitly depend on the order of expressions returned from Expr::expressions()
?
I wonder if we can make it more explicit somehow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's the downside of approach when we represent complex (or complex enough) structure (or some parts of it) with simple list/dict/tuple/etc, however after looking at how LogicalPlan::Repartition
and LogicalPlan::Aggregate
translated into expressions I supposed that we can rely on expressions vector element order in this case.
Regarding your comment above -- I've added explanation of what's going on with last element and what's expected expressions()
behaviour -- hope it can be helpful for next person who access this part of code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you -- I think the comment helps a lot.
@@ -1351,7 +1427,7 @@ mod tests { | |||
let right = LogicalPlanBuilder::from(right_table_scan) | |||
.project(vec![col("a"), col("b"), col("c")])? | |||
.build()?; | |||
let filter = col("test.a") | |||
let filter = col("test.c") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this changed? Because test.a
is a join predicate as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, I've decided not to mix these cases and moved test for filtering on one of join keys to join_filter_on_common
); | ||
|
||
let expected = "\ | ||
Inner Join: #test.a = #test2.a\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend using different columns here -- like test.a = test2.b
so that you can validate that correct column was pushed to each side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.iter() | ||
.flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())]) | ||
.chain( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good catch
.map(|filter| join.filter(filter)) | ||
.unwrap_or(Ok(join))? | ||
.build() | ||
} else if join_type == JoinType::Inner && !normalized_filters.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
\n TableScan: person projection=None\ | ||
\n Filter: #orders.order_id > Int64(1)\ | ||
\n TableScan: orders projection=None"; | ||
\n TableScan: orders projection=None"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to be clear, when run normally the pushdown will still happen, but they are no longer pushed down by the planner, rather they are pushed down at a later stage.
👍
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
…a/arrow-datafusion into join_on_predicate_pushdown
I retriggered failing CI tests -- they seemed to be unrelated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great -- thank you again @korowa
let right = optimize(right, right_state)?; | ||
|
||
// create a new Join with the new `left` and `right` | ||
let expr = plan.expressions(); | ||
let expr = if !on_filter.is_empty() && on_to_keep.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you -- I think the comment helps a lot.
Thanks again @korowa |
Which issue does this PR close?
Closes #2619.
Rationale for this change
Pushdown support for predicates in
JOIN ON
clause and simplification of join planning.What changes are included in this PR?
Join planning now just puts join filter into plan node without splitting it into pushable to inputs predicates.
flter_push_down
rule checks for pushable predicates in join filter expression and replans them as filters for corresponding inputs.Are there any user-facing changes?
No
Does this PR break compatibility with Ballista?
No