Skip to content

Conversation

@adriangb
Copy link
Contributor

Like #16642 this was triggered by attempting to make progress on HashJoinExec.

In particular I realized a fundamental assumption of the existing code, that the optimizer knows how to combine results from children, is flawed and wrong. This requires changes to the public APIs.

I've taken the opportunity to do some thinking and re-designing and while there is a bit more complexity in some places I think others are a lot simpler.

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Jul 10, 2025
@adriangb
Copy link
Contributor Author

Note that I added a HashJoinExec implementation to motivate this PR but remove it in 5940cca because it lacks nuance necessary for a final complete correct implementation (it doesn't take into account the join type, etc.).

I asked Claude to analyze the logical filter pushdown on joins and it reported the following:

  Join Types and Filter Push Down Rules

  For WHERE clause filters (lr_is_preserved):
  - Inner joins: Filters can be pushed to both sides
  - Left joins: Filters can only be pushed to the left side
  - Right joins: Filters can only be pushed to the right side
  - Full joins: Filters cannot be pushed to either side
  - Semi/Anti joins: Filters can be pushed to the preserved side only

  For ON clause filters (on_lr_is_preserved):
  - Inner joins: Filters can be pushed to both sides
  - Left joins: Filters can only be pushed to the right side
  - Right joins: Filters can only be pushed to the left side
  - Full joins: Filters cannot be pushed to either side
  - Semi/Anti joins: Different rules apply based on join variant

  Filter Restrictions

  The can_evaluate_as_join_condition function at line 255 shows that filters can only be converted to join conditions if they:

  Allowed expressions:
  - Column references
  - Literals
  - Placeholders
  - Scalar variables
  - Binary expressions
  - LIKE/SimilarTo predicates
  - NOT expressions
  - IS NULL/IS NOT NULL
  - CASE expressions
  - Cast expressions
  - Try cast expressions
  - Scalar functions

  Disallowed expressions:
  - Subqueries (EXISTS, IN subquery, scalar subquery)
  - Outer reference columns
  - UNNEST expressions

  Additionally, for non-inner joins, inferred predicates must strictly filter out NULLs to be pushed down to avoid incorrect results.

@adriangb adriangb force-pushed the hash-join-allow-filters branch from 167e740 to 3ae2e4e Compare July 10, 2025 01:12
Copy link
Contributor Author

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving some notes

Comment on lines -115 to -122
/// The combined result of pushing down each parent filter into each child.
/// For example, given the fitlers `[a, b]` and children `[1, 2, 3]` the matrix of responses:
///
// | filter | child 1 | child 2 | child 3 | result |
// |--------|-------------|-----------|-----------|-------------|
// | a | Supported | Supported | Supported | Supported |
// | b | Unsupported | Supported | Supported | Unsupported |
///
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the assumption that was fundamentally flawed

Comment on lines 212 to 217
/// The parent filters that were pushed down as received by the current node when [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) was called.
/// Note that this may *not* be the same as the filters that were passed to the children as the current node may have modified them
/// (e.g. by reassigning column indices) when it returned them from [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) in a [`FilterDescription`].
/// Attached to each filter is a [`PredicateSupportDiscriminant`] *per child* that indicates whether the filter was supported or unsupported by each child.
/// To get combined results see [`ChildFitlerPushdownResult::any`] and [`ChildFitlerPushdownResult::all`].
pub parent_filters: Vec<ChildFitlerPushdownResult>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think part of the reason I made that assumption in the original design is was that I didn't want a Vec<Vec<PredicateSupport>>, in part because you could technically have different lengths or different filters. This new structure makes that much more sound.

Ok(FilterPushdownPropagation::transparent(
child_pushdown_result,
))
Ok(FilterPushdownPropagation::all(child_pushdown_result))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that all is much more explicit and clear than transparent. We could even name it something more verbose?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if_all?

Comment on lines 557 to 560
filters: vec![
PredicateSupportDiscriminant::Supported;
child_pushdown_result.parent_filters.len()
],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is now nicer to make 😄

Ok(FilterPushdownPropagation::transparent(
child_pushdown_result,
))
Ok(FilterPushdownPropagation::all(child_pushdown_result))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For nodes with 1 child all and any are equivalent, but I think it's safer to pick all by default.

Comment on lines -437 to -444
/// Support state of each predicate for the children of the node.
/// These predicates are coming from the parent node.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ParentPredicateStates {
NoChildren,
Unsupported,
Supported,
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to get rid of this ugly helper enum

@kosiew
Copy link
Contributor

kosiew commented Jul 10, 2025

quick change request:

ChildFitlerPushdownResult -> ChildFilterPushdownResult

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed this code carefully and it looks good to me. Thank you for pushing this along @adriangb

Some thoughts (nothing required)

Documentation:

I made a PR with some documentation suggestions for your consideration: pydantic#32

Consolidate some structures?

I feel like there is a maze of many small structures that all are very similar but not quite the same. I was wondering if it would be possible to try and unify some of them

Specifically, FilterPushdownPropagation and ChildPushdownResult feel very similar and ChildPushdownResult and ChildFilterDescription

Naming

Something else that occurs to me with some fresh eyes is maybe we could change the naming to make things easier to follow

Something else that might help could be naming -- instead of "support" maybe we could use the term "Pushdown"

So instead of PredicateSupportDiscriminant

/// Used by [`FilterPushdownPropagation`] and [`PredicateSupport`] to
/// communicate the result of attempting to push down a filter into a child
/// node.
///
/// This is the same as [`PredicateSupport`], but without the wrapped expression.
#[derive(Debug, Clone, Copy)]
pub enum PredicateSupportDiscriminant {
    /// The predicate was successfully pushed down into the child node.
    Supported,
    /// The predicate could not be pushed down into the child node.
    Unsupported,
}

maybe something like

enum PushedDown {
  Yes,
  No
}

And the instead of PredicateSupport

#[derive(Debug, Clone)]
pub enum PredicateSupport {
    /// The predicate was successfully pushed down into the child node.
    Supported(Arc<dyn PhysicalExpr>),
    /// The predicate could not be pushed down into the child node.
    Unsupported(Arc<dyn PhysicalExpr>),
}

It would be

#[derive(Debug, Clone)]
pub enum PushedDownPredicate {
   push_down: PushedDown,
   predicate: Arc<dyn PhysicalExpr>
}

let unsupported_self_filters = child_pushdown_result.self_filters[0]
let unsupported_parent_filters =
child_pushdown_result.parent_filters.iter().filter_map(|f| {
matches!(f.all(), PredicateSupportDiscriminant::Unsupported)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a minor readability thing, I would find something like this easier to read:

Suggested change
matches!(f.all(), PredicateSupportDiscriminant::Unsupported)
f.all().is_unsupported()

Something similar to is_some() and is_none() for Option

Maybe I'll try and make a PR to see what that looks like

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there's some magic I'm not thinking about wouldn't that require returning something from all() that can have a is_unsupported() method? I'm weary of adding more tiny structs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do it via an extension trait but not sure that reduces the cognitive burden

Ok(FilterPushdownPropagation::transparent(
child_pushdown_result,
))
Ok(FilterPushdownPropagation::all(child_pushdown_result))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if_all?

}

/// Convert the [`PredicateSupport`] into a [`PredicateSupportDiscriminant`].
pub fn discriminant(&self) -> PredicateSupportDiscriminant {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe PredicateSupport would be easier to manage if it was a struct with a discrimnant field rather than an enum

@adriangb
Copy link
Contributor Author

Thank you for the review @alamb

I incorporated your suggestion for restructuring the enum / struct into a struct with a discriminant field. It is much better.

I also renamed all to if_all and did some more docs tweaking.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jul 10, 2025
@adriangb
Copy link
Contributor Author

Note that I added a HashJoinExec implementation to motivate this PR but remove it in 5940cca because it lacks nuance necessary for a final complete correct implementation (it doesn't take into account the join type, etc.).

I spoke with @alamb and we agreed that we'd start with only inner joins (which are always safe) and if we ever want to support anything fancier we'll deal with the complexity then

@alamb alamb merged commit 4bc66c8 into apache:main Jul 11, 2025
27 checks passed
@alamb
Copy link
Contributor

alamb commented Jul 11, 2025

Thanks again @adriangb

@alamb alamb added the performance Make DataFusion faster label Jul 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate optimizer Optimizer rules performance Make DataFusion faster physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants