Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr,
PhysicalSortRequirement,
};
use datafusion_physical_plan::execution_plan::CardinalityEffect;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::joins::utils::{
calculate_join_output_ordering, ColumnIndex,
Expand Down Expand Up @@ -190,6 +191,7 @@ fn pushdown_sorts_helper(
} else if let Some(adjusted) = pushdown_requirement_to_children(
&sort_push_down.plan,
parent_requirement.clone(),
parent_fetch,
)? {
// For operators that can take a sort pushdown, continue with updated
// requirements:
Expand All @@ -216,7 +218,41 @@ fn pushdown_sorts_helper(
fn pushdown_requirement_to_children(
plan: &Arc<dyn ExecutionPlan>,
parent_required: OrderingRequirements,
parent_fetch: Option<usize>,
) -> Result<Option<Vec<Option<OrderingRequirements>>>> {
// If there is a limit on the parent plan we cannot push it down through operators that change the cardinality.
// E.g. consider if LIMIT 2 is applied below a FilteExec that filters out 1/2 of the rows we'll end up with 1 row instead of 2.
// If the LIMIT is applied after the FilterExec and the FilterExec returns > 2 rows we'll end up with 2 rows (correct).
if parent_fetch.is_some() && !plan.supports_limit_pushdown() {
return Ok(None);
}
// Note: we still need to check the cardinality effect of the plan here, because the
// limit pushdown is not always safe, even if the plan supports it. Here's an example:
//
// UnionExec advertises `supports_limit_pushdown() == true` because it can
// forward a LIMIT k to each of its children—i.e. apply “LIMIT k” separately
// on each branch before merging them together.
//
// However, UnionExec’s `cardinality_effect() == GreaterEqual` (it sums up
// all child row counts), so pushing a global TopK/LIMIT through it would
// break the semantics of “take the first k rows of the combined result.”
//
// For example, with two branches A and B and k = 3:
// — Global LIMIT: take the first 3 rows from (A ∪ B) after merging.
// — Pushed down: take 3 from A, 3 from B, then merge → up to 6 rows!
//
// That’s why we still block on cardinality: even though UnionExec can
// push a LIMIT to its children, its GreaterEqual effect means it cannot
// preserve the global TopK semantics.
if parent_fetch.is_some() {
match plan.cardinality_effect() {
CardinalityEffect::Equal => {
// safe: only true sources (e.g. CoalesceBatchesExec, ProjectionExec) pass
}
_ => return Ok(None),
}
}

let maintains_input_order = plan.maintains_input_order();
if is_window(plan) {
let mut required_input_ordering = plan.required_input_ordering();
Expand Down
68 changes: 68 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4790,3 +4790,71 @@ DROP TABLE compound_field_table_t;

statement ok
DROP TABLE compound_field_table_u;


statement ok
CREATE TABLE t1 (k INT, v INT);

statement ok
CREATE TABLE t2 (k INT, v INT);

statement ok
INSERT INTO t1
SELECT value AS k, value AS v
FROM range(1, 10001) AS t(value);

statement ok
INSERT INTO t2 VALUES (1, 1);

## The TopK(Sort with fetch) should not be pushed down to the hash join
query TT
explain
SELECT *
FROM t1
LEFT ANTI JOIN t2 ON t1.k = t2.k
ORDER BY t1.k
LIMIT 2;
----
logical_plan
01)Sort: t1.k ASC NULLS LAST, fetch=2
02)--LeftAnti Join: t1.k = t2.k
03)----TableScan: t1 projection=[k, v]
04)----TableScan: t2 projection=[k]
physical_plan
01)SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--CoalesceBatchesExec: target_batch_size=3
03)----HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)]
04)------DataSourceExec: partitions=1, partition_sizes=[1]
05)------DataSourceExec: partitions=1, partition_sizes=[3334]


query II
SELECT *
FROM t1
LEFT ANTI JOIN t2 ON t1.k = t2.k
ORDER BY t1.k
LIMIT 2;
----
2 2
3 3


## Test left anti join without limit, we should support push down sort to the left side
query TT
explain
SELECT *
FROM t1
LEFT ANTI JOIN t2 ON t1.k = t2.k
ORDER BY t1.k;
----
logical_plan
01)Sort: t1.k ASC NULLS LAST
02)--LeftAnti Join: t1.k = t2.k
03)----TableScan: t1 projection=[k, v]
04)----TableScan: t2 projection=[k]
physical_plan
01)CoalesceBatchesExec: target_batch_size=3
02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)]
03)----DataSourceExec: partitions=1, partition_sizes=[1]
04)----SortExec: expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false]
05)------DataSourceExec: partitions=1, partition_sizes=[3334]
11 changes: 5 additions & 6 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -663,15 +663,14 @@ logical_plan
physical_plan
01)GlobalLimitExec: skip=4, fetch=10
02)--SortPreservingMergeExec: [c@0 DESC], fetch=14
03)----UnionExec
04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be right, we should union then to sort limit.

04)------UnionExec
05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true
08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
10)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], file_type=csv, has_header=true
08)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
09)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], file_type=csv, has_header=true

# Applying LIMIT & OFFSET to subquery.
query III
Expand Down
9 changes: 4 additions & 5 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1258,13 +1258,12 @@ logical_plan
08)--------TableScan: ordered_table projection=[a0, b, c, d]
physical_plan
01)SortPreservingMergeExec: [d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], fetch=2
02)--UnionExec
03)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[true]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

03)----UnionExec
04)------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d]
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true
06)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false]
07)------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d]
08)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true
06)------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d]
07)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true

# Test: run the query from above
query IIIII
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/topk.slt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ query I
select * from (select * from topk limit 8) order by x limit 3;
----
0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is right, becasue we select * from topk limit 8 but not order, the original case should push down the sort limit to it.

1
2
2


Expand Down
11 changes: 5 additions & 6 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,14 @@ logical_plan
06)------TableScan: aggregate_test_100 projection=[c1, c3]
physical_plan
01)SortPreservingMergeExec: [c9@1 DESC], fetch=5
02)--UnionExec
03)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
02)--SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

03)----UnionExec
04)------ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Decimal128(20, 0)) as c9]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], file_type=csv, has_header=true
07)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
08)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as c9]
09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], file_type=csv, has_header=true
07)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as c9]
08)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], file_type=csv, has_header=true

query TR
SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5
Expand Down