Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pushdown support for predicates in ON clause of joins #2647

Merged
merged 5 commits into from
Jun 2, 2022

Conversation

korowa
Copy link
Contributor

@korowa korowa commented May 29, 2022

Which issue does this PR close?

Closes #2619.

Rationale for this change

Pushdown support for predicates in JOIN ON clause and simplification of join planning.

What changes are included in this PR?

Join planning now just puts join filter into plan node without splitting it into pushable to inputs predicates.

flter_push_down rule checks for pushable predicates in join filter expression and replans them as filters for corresponding inputs.

Are there any user-facing changes?

No

Does this PR break compatibility with Ballista?

No

@github-actions github-actions bot added core Core DataFusion crate datafusion Changes in the datafusion crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner labels May 29, 2022
@korowa korowa force-pushed the join_on_predicate_pushdown branch from 7af4483 to 7e2432f Compare May 29, 2022 16:06
@korowa korowa force-pushed the join_on_predicate_pushdown branch from 7e2432f to cbe65ed Compare May 29, 2022 18:34
\n TableScan: person projection=None\
\n Filter: #orders.order_id > Int64(1)\
\n TableScan: orders projection=None";
\n TableScan: orders projection=None";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that it can't pushdown filter to TableScan👀

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it can not anymore - now all predicate movements are on filter_push_down optimizer, and there is no need for planner to define should it push some filters down or not - it just creates plan node as it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to be clear, when run normally the pushdown will still happen, but they are no longer pushed down by the planner, rather they are pushed down at a later stage.

👍

@alamb
Copy link
Contributor

alamb commented May 31, 2022

I plan to review this later today or tomorrow if no one else has a chance to do so before

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at this PR carefully -- really nice work @korowa

I have some suggestions that might make the code easier to follow / less brittle to other changes in the future, but i think what you have here is well tested and looks correct to me.

I think the assumptions that the filter_expr is the last expression that comes back from Expr::expressions() is fine but could potentially be confusing to future readers (I imagine myself in some time looking at this code). Maybe some additional documentation could help here

In any event, thank you so much 🏅

@@ -178,13 +177,35 @@ fn lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) {
}
}

// For a given JOIN logical plan, determine whether each side of the join is preserved
// in terms on join filtering.
// Predicates from join filter can only be pushed to preserved join side.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// Determine which predicates in state can be pushed down to a given side of a join.
// To determine this, we need to know the schema of the relevant join side and whether
// or not the side's rows are preserved when joining. If the side is not preserved, we
// do not push down anything. Otherwise we can push down predicates where all of the
// relevant columns are contained on the relevant join side's schema.
fn get_pushable_join_predicates<'a>(
state: &'a State,
filters: &'a [(Expr, HashSet<Column>)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to document what the filters here represents, either in a doc string ("pairs of Exprs and a set of columns that represent ...") or maybe as a struct?

struct PushFilters {
  /// expr represents an On clause predicate
  expr: Expr,
   /// columns represents columns that appear in `expr`
   cols: HashSet<Column>
}

Copy link
Contributor Author

@korowa korowa Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added type Predicate (we already have type Predicates for similar kind of thing, so uniform naming might be better) and replaced all tuples of expression and its columns in signatures/attributes with it - hope it looks better.

let to_left = get_pushable_join_predicates(&state, left.schema(), left_preserved);
let to_right = get_pushable_join_predicates(&state, right.schema(), right_preserved);

let to_left =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code is very nicely written and easy to follow 👍

let right = optimize(right, right_state)?;

// create a new Join with the new `left` and `right`
let expr = plan.expressions();
let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really follow how we know the last element here in expr are the on expression -- doesn't that implicitly depend on the order of expressions returned from Expr::expressions()?

I wonder if we can make it more explicit somehow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's the downside of approach when we represent complex (or complex enough) structure (or some parts of it) with simple list/dict/tuple/etc, however after looking at how LogicalPlan::Repartition and LogicalPlan::Aggregate translated into expressions I supposed that we can rely on expressions vector element order in this case.

Regarding your comment above -- I've added explanation of what's going on with last element and what's expected expressions() behaviour -- hope it can be helpful for next person who access this part of code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you -- I think the comment helps a lot.

@@ -1351,7 +1427,7 @@ mod tests {
let right = LogicalPlanBuilder::from(right_table_scan)
.project(vec![col("a"), col("b"), col("c")])?
.build()?;
let filter = col("test.a")
let filter = col("test.c")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed? Because test.a is a join predicate as well?

Copy link
Contributor Author

@korowa korowa Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, I've decided not to mix these cases and moved test for filtering on one of join keys to join_filter_on_common

);

let expected = "\
Inner Join: #test.a = #test2.a\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using different columns here -- like test.a = test2.b so that you can validate that correct column was pushed to each side

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.iter()
.flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())])
.chain(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch

datafusion/expr/src/utils.rs Show resolved Hide resolved
.map(|filter| join.filter(filter))
.unwrap_or(Ok(join))?
.build()
} else if join_type == JoinType::Inner && !normalized_filters.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

\n TableScan: person projection=None\
\n Filter: #orders.order_id > Int64(1)\
\n TableScan: orders projection=None";
\n TableScan: orders projection=None";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to be clear, when run normally the pushdown will still happen, but they are no longer pushed down by the planner, rather they are pushed down at a later stage.

👍

@alamb
Copy link
Contributor

alamb commented Jun 2, 2022

I retriggered failing CI tests -- they seemed to be unrelated

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great -- thank you again @korowa

let right = optimize(right, right_state)?;

// create a new Join with the new `left` and `right`
let expr = plan.expressions();
let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you -- I think the comment helps a lot.

@alamb alamb merged commit f547262 into apache:master Jun 2, 2022
@alamb
Copy link
Contributor

alamb commented Jun 2, 2022

Thanks again @korowa

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datafusion Changes in the datafusion crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Move JOIN ON predicates push down logic from planner to optimizer
3 participants