Skip to content

Conversation

@liamzwbao
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

Add the missing equivalence info so that optimizer can pick it up when pruning SortExec

What changes are included in this PR?

Add the equivalence info based on the filters

Are these changes tested?

Added in parquet_filter_pushdown.slt

Are there any user-facing changes?

Yes, fix the unexpected behavior

@github-actions github-actions bot added sqllogictest SQL Logic Tests (.slt) datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Jul 4, 2025
@liamzwbao liamzwbao force-pushed the issue-16563-pushdown-filter branch from 0b90c24 to 62193e9 Compare July 4, 2025 20:19
@github-actions github-actions bot added the core Core DataFusion crate label Jul 4, 2025
- FilterExec: b@1 = bar
- CoalesceBatchesExec: target_batch_size=100
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a side effect for the fix, but I think it's okay as it preserves the ordering_mode instead of defaulting it to Linear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- this looks good to me (the filter pushdown now lets the gby know that the data is sorted on a which makes sense as a now has exactly one value (foo)

@liamzwbao liamzwbao force-pushed the issue-16563-pushdown-filter branch from 62193e9 to 4732c0f Compare July 4, 2025 21:00
@alamb
Copy link
Contributor

alamb commented Jul 5, 2025

Thank you @liamzwbao

@xudong963 or @adriangb do you have time to review this PR?

Self::compute_properties(Arc::clone(&new_node.data_source));
// Add the missing filters' equivalence info when filters pushdown is applied
let filter = conjunction(res.filters.collect_supported());
new_node = new_node.add_filter_equivalence_info(filter)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the node already had equivalence info / old filters?

Practically speaking this shouldn't happen, but it is technically possible for a data source to have some filters passed in directly (e.g. in a TableProvider) and some filters passed in during pushdown.

I think this is correct because it's adding the new filters and when we built the new node we did a conjunction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will add the eq info from filter pushdown into the existing eq info group from DataSource.

Maybe it's better if we have a test case for it, but I'm not sure how to create a DataSource with eq info here. Are there any examples we can refer to?

Comment on lines +382 to +393
fn add_filter_equivalence_info(
mut self,
filter: Arc<dyn PhysicalExpr>,
) -> Result<Self> {
let (equal_pairs, _) = collect_columns_from_predicate(&filter);
for (lhs, rhs) in equal_pairs {
self.cache
.eq_properties
.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
}
Ok(self)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part I'm not familiar with and would appreciate if someone else took a look at and confirm is correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, it's enough, lgtm

Comment on lines +352 to +368
query T
select a from t_pushdown where b = 2 ORDER BY b;
----
bar

query TT
EXPLAIN select a from t_pushdown where b = 2 ORDER BY b;
----
logical_plan
01)Projection: t_pushdown.a
02)--Sort: t_pushdown.b ASC NULLS LAST
03)----Filter: t_pushdown.b = Int32(2)
04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)]
physical_plan
01)CoalescePartitionsExec
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 = 2, pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= b_max@1, required_guarantees=[b in (2)]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liamzwbao could you help me understand what I should be looking for in this output / what proves that this fix works / what would have been different before this fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 1st commit reproduce the issue, and the 2nd commit reflect the fix.

Basically, the SortExec which matches the filter equivalence gets removed after the fix.

@xudong963
Copy link
Member

I'll verify the PR in our fork tomorrow, thanks @liamzwbao

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @liamzwbao , it works on my side.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me -- thank you @liamzwbao @xudong963 and @adriangb

- FilterExec: b@1 = bar
- CoalesceBatchesExec: target_batch_size=100
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- this looks good to me (the filter pushdown now lets the gby know that the data is sorted on a which makes sense as a now has exactly one value (foo)

@alamb alamb merged commit 8596812 into apache:main Jul 8, 2025
27 checks passed
@liamzwbao liamzwbao deleted the issue-16563-pushdown-filter branch July 8, 2025 22:24
jonathanc-n added a commit to jonathanc-n/datafusion that referenced this pull request Jul 9, 2025
xudong963 pushed a commit that referenced this pull request Jul 9, 2025
* fix: Fix CI failing due to #16686

* fix

* add original comment

* change
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bug: the new filter pushdown optimizer rule in physical layer will miss the equivalence info in filter

4 participants