-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Add the missing equivalence info for filter pushdown #16686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
0b90c24 to
62193e9
Compare
| - FilterExec: b@1 = bar | ||
| - CoalesceBatchesExec: target_batch_size=100 | ||
| - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt] | ||
| - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a side effect for the fix, but I think it's okay as it preserves the ordering_mode instead of defaulting it to Linear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- this looks good to me (the filter pushdown now lets the gby know that the data is sorted on a which makes sense as a now has exactly one value (foo)
62193e9 to
4732c0f
Compare
|
Thank you @liamzwbao @xudong963 or @adriangb do you have time to review this PR? |
| Self::compute_properties(Arc::clone(&new_node.data_source)); | ||
| // Add the missing filters' equivalence info when filters pushdown is applied | ||
| let filter = conjunction(res.filters.collect_supported()); | ||
| new_node = new_node.add_filter_equivalence_info(filter)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the node already had equivalence info / old filters?
Practically speaking this shouldn't happen, but it is technically possible for a data source to have some filters passed in directly (e.g. in a TableProvider) and some filters passed in during pushdown.
I think this is correct because it's adding the new filters and when we built the new node we did a conjunction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will add the eq info from filter pushdown into the existing eq info group from DataSource.
Maybe it's better if we have a test case for it, but I'm not sure how to create a DataSource with eq info here. Are there any examples we can refer to?
| fn add_filter_equivalence_info( | ||
| mut self, | ||
| filter: Arc<dyn PhysicalExpr>, | ||
| ) -> Result<Self> { | ||
| let (equal_pairs, _) = collect_columns_from_predicate(&filter); | ||
| for (lhs, rhs) in equal_pairs { | ||
| self.cache | ||
| .eq_properties | ||
| .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))? | ||
| } | ||
| Ok(self) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part I'm not familiar with and would appreciate if someone else took a look at and confirm is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, it's enough, lgtm
| query T | ||
| select a from t_pushdown where b = 2 ORDER BY b; | ||
| ---- | ||
| bar | ||
|
|
||
| query TT | ||
| EXPLAIN select a from t_pushdown where b = 2 ORDER BY b; | ||
| ---- | ||
| logical_plan | ||
| 01)Projection: t_pushdown.a | ||
| 02)--Sort: t_pushdown.b ASC NULLS LAST | ||
| 03)----Filter: t_pushdown.b = Int32(2) | ||
| 04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)] | ||
| physical_plan | ||
| 01)CoalescePartitionsExec | ||
| 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 = 2, pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= b_max@1, required_guarantees=[b in (2)] | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liamzwbao could you help me understand what I should be looking for in this output / what proves that this fix works / what would have been different before this fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 1st commit reproduce the issue, and the 2nd commit reflect the fix.
Basically, the SortExec which matches the filter equivalence gets removed after the fix.
|
I'll verify the PR in our fork tomorrow, thanks @liamzwbao |
xudong963
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @liamzwbao , it works on my side.
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- thank you @liamzwbao @xudong963 and @adriangb
| - FilterExec: b@1 = bar | ||
| - CoalesceBatchesExec: target_batch_size=100 | ||
| - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt] | ||
| - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- this looks good to me (the filter pushdown now lets the gby know that the data is sorted on a which makes sense as a now has exactly one value (foo)
Which issue does this PR close?
Rationale for this change
Add the missing equivalence info so that optimizer can pick it up when pruning SortExec
What changes are included in this PR?
Add the equivalence info based on the filters
Are these changes tested?
Added in
parquet_filter_pushdown.sltAre there any user-facing changes?
Yes, fix the unexpected behavior