- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1.7k
Refactor filter pushdown APIs to enable joins to pass through filters #16732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Note that I added a HashJoinExec implementation to motivate this PR but remove it in 5940cca because it lacks nuance necessary for a final complete correct implementation (it doesn't take into account the join type, etc.). I asked Claude to analyze the logical filter pushdown on joins and it reported the following:  | 
167e740    to
    3ae2e4e      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaving some notes
| /// The combined result of pushing down each parent filter into each child. | ||
| /// For example, given the fitlers `[a, b]` and children `[1, 2, 3]` the matrix of responses: | ||
| /// | ||
| // | filter | child 1 | child 2 | child 3 | result | | ||
| // |--------|-------------|-----------|-----------|-------------| | ||
| // | a | Supported | Supported | Supported | Supported | | ||
| // | b | Unsupported | Supported | Supported | Unsupported | | ||
| /// | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the assumption that was fundamentally flawed
| /// The parent filters that were pushed down as received by the current node when [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) was called. | ||
| /// Note that this may *not* be the same as the filters that were passed to the children as the current node may have modified them | ||
| /// (e.g. by reassigning column indices) when it returned them from [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) in a [`FilterDescription`]. | ||
| /// Attached to each filter is a [`PredicateSupportDiscriminant`] *per child* that indicates whether the filter was supported or unsupported by each child. | ||
| /// To get combined results see [`ChildFitlerPushdownResult::any`] and [`ChildFitlerPushdownResult::all`]. | ||
| pub parent_filters: Vec<ChildFitlerPushdownResult>, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think part of the reason I made that assumption in the original design is was that I didn't want a Vec<Vec<PredicateSupport>>, in part because you could technically have different lengths or different filters. This new structure makes that much more sound.
| Ok(FilterPushdownPropagation::transparent( | ||
| child_pushdown_result, | ||
| )) | ||
| Ok(FilterPushdownPropagation::all(child_pushdown_result)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that all is much more explicit and clear than transparent. We could even name it something more verbose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe if_all?
| filters: vec![ | ||
| PredicateSupportDiscriminant::Supported; | ||
| child_pushdown_result.parent_filters.len() | ||
| ], | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is now nicer to make 😄
| Ok(FilterPushdownPropagation::transparent( | ||
| child_pushdown_result, | ||
| )) | ||
| Ok(FilterPushdownPropagation::all(child_pushdown_result)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For nodes with 1 child all and any are equivalent, but I think it's safer to pick all by default.
| /// Support state of each predicate for the children of the node. | ||
| /// These predicates are coming from the parent node. | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| enum ParentPredicateStates { | ||
| NoChildren, | ||
| Unsupported, | ||
| Supported, | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to get rid of this ugly helper enum
| quick change request: ChildFitlerPushdownResult -> ChildFilterPushdownResult | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed this code carefully and it looks good to me. Thank you for pushing this along @adriangb
Some thoughts (nothing required)
Documentation:
I made a PR with some documentation suggestions for your consideration: pydantic#32
Consolidate some structures?
I feel like there is a maze of many small structures that all are very similar but not quite the same. I was wondering if it would be possible to try and unify some of them
Specifically, FilterPushdownPropagation and ChildPushdownResult feel very similar and ChildPushdownResult and ChildFilterDescription
Naming
Something else that occurs to me with some fresh eyes is maybe we could change the naming to make things easier to follow
Something else that might help could be naming -- instead of "support" maybe we could use the term "Pushdown"
So instead of PredicateSupportDiscriminant
/// Used by [`FilterPushdownPropagation`] and [`PredicateSupport`] to
/// communicate the result of attempting to push down a filter into a child
/// node.
///
/// This is the same as [`PredicateSupport`], but without the wrapped expression.
#[derive(Debug, Clone, Copy)]
pub enum PredicateSupportDiscriminant {
    /// The predicate was successfully pushed down into the child node.
    Supported,
    /// The predicate could not be pushed down into the child node.
    Unsupported,
}maybe something like
enum PushedDown {
  Yes,
  No
}And the instead of PredicateSupport
#[derive(Debug, Clone)]
pub enum PredicateSupport {
    /// The predicate was successfully pushed down into the child node.
    Supported(Arc<dyn PhysicalExpr>),
    /// The predicate could not be pushed down into the child node.
    Unsupported(Arc<dyn PhysicalExpr>),
}It would be
#[derive(Debug, Clone)]
pub enum PushedDownPredicate {
   push_down: PushedDown,
   predicate: Arc<dyn PhysicalExpr>
}| let unsupported_self_filters = child_pushdown_result.self_filters[0] | ||
| let unsupported_parent_filters = | ||
| child_pushdown_result.parent_filters.iter().filter_map(|f| { | ||
| matches!(f.all(), PredicateSupportDiscriminant::Unsupported) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a minor readability thing, I would find something like this easier to read:
| matches!(f.all(), PredicateSupportDiscriminant::Unsupported) | |
| f.all().is_unsupported() | 
Something similar to is_some() and is_none() for Option
Maybe I'll try and make a PR to see what that looks like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless there's some magic I'm not thinking about wouldn't that require returning something from all() that can have a is_unsupported() method? I'm weary of adding more tiny structs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do it via an extension trait but not sure that reduces the cognitive burden
| Ok(FilterPushdownPropagation::transparent( | ||
| child_pushdown_result, | ||
| )) | ||
| Ok(FilterPushdownPropagation::all(child_pushdown_result)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe if_all?
| } | ||
|  | ||
| /// Convert the [`PredicateSupport`] into a [`PredicateSupportDiscriminant`]. | ||
| pub fn discriminant(&self) -> PredicateSupportDiscriminant { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe PredicateSupport would be easier to manage if it was a struct with a discrimnant field rather than an enum
| Thank you for the review @alamb I incorporated your suggestion for restructuring the enum / struct into a struct with a discriminant field. It is much better. I also renamed  | 
| 
 I spoke with @alamb and we agreed that we'd start with only inner joins (which are always safe) and if we ever want to support anything fancier we'll deal with the complexity then | 
| Thanks again @adriangb | 
Like #16642 this was triggered by attempting to make progress on HashJoinExec.
In particular I realized a fundamental assumption of the existing code, that the optimizer knows how to combine results from children, is flawed and wrong. This requires changes to the public APIs.
I've taken the opportunity to do some thinking and re-designing and while there is a bit more complexity in some places I think others are a lot simpler.