Skip to content

Conversation

@zhuqi-lucas
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas commented Jul 1, 2025

Which issue does this PR close?

Rationale for this change

We should not pushdown topk(sort with limit/fetch) to hashjoin anti join.

What changes are included in this PR?

Disable hashjoin pushdown topk with Anti join.

Are these changes tested?

Yes, slt testing added.

Are there any user-facing changes?

No

@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Jul 1, 2025
@nuno-faria
Copy link
Contributor

Thanks @zhuqi-lucas, it returns the correct result now.

@zhuqi-lucas
Copy link
Contributor Author

Thank you @nuno-faria for checking!

Comment on lines 673 to 678
match plan.join_type() {
JoinType::LeftAnti | JoinType::RightAnti => {
return Ok(None);
}
_ => {}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these implement maintains_input_order and hit the check a couple lines below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @adriangb for review:

Good point—at first glance it looks like RightAnti would simply fall out at the !maintains_input_order()[1] check below, but in reality we need to short‑circuit before that.

The maintains_input_order method is merely telling us “if you streamed all probe‑side matches you’d get them in input order,” which is true of a RightAnti join (it does process probe rows in sequence). But an anti‑join also drops rows, and that drop can interleave arbitrarily with the sort key, so a pre‑sort + anti‑join does not guarantee globally sorted output.

In other words:

maintains_input_order()[1] == true means “if you output every matching probe row, you’d preserve order.”

Anti‑joins however filter out some rows, so you cannot rely on that to still produce a correctly sorted subset.

That’s why we explicitly bail out on both LeftAnti and RightAnti before ever calling maintains_input_order—we need to prevent any sort‑pushdown for anti‑joins, even though their raw probe phase might look ordered.

And since maintains_input_order is used elsewhere for true order‑preserving cases, we keep it unchanged here.

Copy link
Contributor

@adriangb adriangb Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay interesting. Thank you for the in-depth explanation.

Could you show with a simple table representing a batch how the anti join filtering out rows causes ordering to be lost? I imagine something like this flowing into the anti join:

a b c
1 1 2
2 3 5
2 4 1

For the query SELECT a, b, c FROM t1 WHERE c NOT IN (SELECT n FROM t2) ORDER BY t1.a, t2.b I expect the anti join to be created and for it to remove some rows, let's say SELECT n FROM t2 returns just 5, then the output would be:

a b c
1 1 2
2 4 1

Which is still ordered. Are you saying it might be

a b c
2 4 1
1 1 2

instead?

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @adriangb, exactly you are right. Correct me if i am wrong.

Why Anti‑Joins Are Different
An anti‑join (LEFT ANTI or RIGHT ANTI) filters out all rows that do have matches, emitting only the non‑matching ones. Although maintains_input_order() still returns true—because if you emitted every probe row in match order, they’d be ordered—the act of dropping rows breaks that guarantee:

Per‑batch filtering
In a batched execution, each batch is sorted, then some rows are independently discarded by the anti‑join filter.

No final merge
When you push the sort below the anti‑join, you end up just concatenating each sorted batch’s output. A small key in a later batch can easily slip behind a larger key from an earlier batch, violating the global ORDER BY.

That is why we must short‑circuit and refuse to push the sort beneath any anti‑join: without a final merge or a full sort after filtering, the concatenation of filtered, sorted batches cannot preserve the overall ordering guarantee.

Why Anti‑Joins Break Global Ordering: A Batch Example

Suppose we split the probe-side input into two batches, each sorted by a, and then simply concatenate their outputs after filtering. Here’s how the global order can be lost:

-- Batch 1 input (sorted by a)
a | b | c
--+---+---
1 | x | …
3 | x | …

-- Batch 2 input (sorted by a)
a | b | c
--+---+---
2 | y | …
4 | y | …

-- Assume the anti‑join filter keeps all rows (no actual filtering for simplicity).
-- So each batch output remains:
--   Batch 1 → [ (1, x, …), (3, x, …) ]
--   Batch 2 → [ (2, y, …), (4, y, …) ]

-- Naively concatenating Batch 1 then Batch 2:
1 | x | …  
3 | x | …   ← (3 > 2) but appears before the row with a=2  
2 | y | …  
4 | y | …

-- Final sequence: [1, 3, 2, 4], which is *not* globally sorted by `a`.

-- In contrast, a true global merge of two sorted streams:
--   merge([1,3], [2,4]) → [1,2,3,4]
-- would preserve order.

1. Sorting each batch in isolation only guarantees local order.  
2. An anti‑join may filter out rows, but even without filtering, concatenating sorted batches breaks global ordering.  
3. Therefore, we must prevent pushing a sort below an anti‑join unless we perform a final merge or full sort after filtering.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that in that example no global order is guaranteed in the output, there was no sorting being done before the anti-join! I think if you pushed a SortExec you'd get the right order.

Having thought about it a bit I do think this is a real bug that needs fixing, but I'm not sure it's about sorting, it's about limits: if you push the TopK below the anti join is the limit is applied before the anti join filters out rows, which means you'll end up getting less or the wrong rows out. For example, if the batch into the TopK(2) is

a b c
2 3 5
1 1 2
2 4 1

It will output

a b c
1 1 2
2 3 5

But then if the anti join filters out c = 5 we get:

a b c
1 1 2

Which is a single row despite the LIMIT 2.

If instead we apply the anti-join first we get:

a b c
1 1 2
2 4 1

Which is correct.

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems we need to pushdown sort exec without fetch, so it can avoid the topk cases?

I need further investigate it, thank you @adriangb !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah my guess is that you can push down a sort below any operator that preserves order but you can't push down a a limit below any operator that changes the cardinality of the data. I'm surprised we can push TopK down through a HashJoin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @adriangb for confirming, addressed it in latest PR.

Now, we will not pushdown Topk for anti join.

But for normal sort without fetch, we should support push down, also added the related slt testing.

Copy link
Contributor

@adriangb adriangb Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amazing!

you can't push down a a limit below any operator that changes the cardinality of the data

one question here: can we push down operators that have a limit (LIMIT itself but also TopK) past other operators that change cardinality? e.g. HashJoinExec? I feel like maybe what we should do is call ExecutionPlan::cardinality_effect on the plan we are pushing down and the plan we are pushing through and only allow the pushdown if one of them is CardinalityEffect::Equal (I think that'd be correct). In the case of HashJoinExec + TopK you'd have Unknown and LowerEqual so you can't push down. HashJoinExec + non TopK SortExec would have Unknown and Equal so you can push down. This would generalize to to handle pushing through a FilterExec, etc.

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amazing!

you can't push down a a limit below any operator that changes the cardinality of the data

one question here: can we push down operators that have a limit (LIMIT itself but also TopK) past other operators that change cardinality? e.g. HashJoinExec? I feel like maybe what we should do is call ExecutionPlan::cardinality_effect on the plan we are pushing down and the plan we are pushing through and only allow the pushdown if one of them is CardinalityEffect::Equal (I think that'd be correct). In the case of HashJoinExec + TopK you'd have Unknown and LowerEqual so you can't push down. HashJoinExec + non TopK SortExec would have Unknown and Equal so you can push down. This would generalize to to handle pushing through a FilterExec, etc.

Great point!

Try to address it in latest PR, and it seems some slt files changes, but it should be right, the unionexec also should not pass the TopK i believe.

02)--SortPreservingMergeExec: [c@0 DESC], fetch=14
03)----UnionExec
04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be right, we should union then to sort limit.

query I
select * from (select * from topk limit 8) order by x limit 3;
----
0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is right, becasue we select * from topk limit 8 but not order, the original case should push down the sort limit to it.

01)SortPreservingMergeExec: [d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], fetch=2
02)--UnionExec
03)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[true]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

01)SortPreservingMergeExec: [c9@1 DESC], fetch=5
02)--UnionExec
03)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
02)--SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Comment on lines 223 to 238
// Only attempt to push down TopK when there is an upstream LIMIT
if parent_fetch.is_some() {
// 1) Never push a new TopK below an operator that already has its own fetch
if plan.fetch().is_some() {
return Ok(None);
}
// 2) Only allow pushdown through operators that do not increase row count
// (equal cardinality). Any other operator (including joins, filter,
// sort-with-limit, or UDTFs that may expand rows) must stop the pushdown.
let effect = plan.cardinality_effect();
if !matches!(effect, CardinalityEffect::Equal) {
return Ok(None);
}
// At this point, only single-input, non-expanding operators
// such as ProjectionExec, CoalesceBatchesExec, are allowed to receive TopK.
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Only attempt to push down TopK when there is an upstream LIMIT
if parent_fetch.is_some() {
// 1) Never push a new TopK below an operator that already has its own fetch
if plan.fetch().is_some() {
return Ok(None);
}
// 2) Only allow pushdown through operators that do not increase row count
// (equal cardinality). Any other operator (including joins, filter,
// sort-with-limit, or UDTFs that may expand rows) must stop the pushdown.
let effect = plan.cardinality_effect();
if !matches!(effect, CardinalityEffect::Equal) {
return Ok(None);
}
// At this point, only single-input, non-expanding operators
// such as ProjectionExec, CoalesceBatchesExec, are allowed to receive TopK.
}
// If there is a limit on the parent plan we cannot push it down through operators that change the cardinality.
// E.g. consider if LIMIT 2 is applied below a FilteExec that filters out 1/2 of the rows we'll end up with 1 row instead of 2.
// If the LIMIT is applied after the FilterExec and the FilterExec returns > 2 rows we'll end up with 2 rows (correct).
if parent_fetch.is_some() && !plan.supports_limit_pushdown() {
return Ok(None)
}

wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @adriangb , exactly more concise and accurate solution! Merged your suggestion now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After testing, this is not enough, for example, the unionexec support limit push down, but the topk should not pushdown through UnionExec.

 // If there is a limit on the parent plan we cannot push it down through operators that change the cardinality.
    // E.g. consider if LIMIT 2 is applied below a FilteExec that filters out 1/2 of the rows we'll end up with 1 row instead of 2.
    // If the LIMIT is applied after the FilterExec and the FilterExec returns > 2 rows we'll end up with 2 rows (correct).
    if parent_fetch.is_some() {
        if !plan.supports_limit_pushdown() {
            return Ok(None);
        }

        // Note: we still need to check the cardinality effect of the plan here, because the
        // limit pushdown is not always safe, even if the plan supports it. Here's an example:
        //
        // UnionExec advertises `supports_limit_pushdown() == true` because it can
        // forward a LIMIT k to each of its children—i.e. apply “LIMIT k” separately
        // on each branch before merging them together.
        //
        // However, UnionExec’s `cardinality_effect() == GreaterEqual` (it sums up
        // all child row counts), so pushing a global TopK/LIMIT through it would
        // break the semantics of “take the first k rows of the combined result.”
        //
        // For example, with two branches A and B and k = 3:
        //   — Global LIMIT: take the first 3 rows from (A ∪ B) after merging.
        //   — Pushed down: take 3 from A, 3 from B, then merge → up to 6 rows!
        //
        // That’s why we still block on cardinality: even though UnionExec can
        // push a LIMIT to its children, its GreaterEqual effect means it cannot
        // preserve the global TopK semantics.
        match plan.cardinality_effect() {
            CardinalityEffect::Equal => {
                // safe: only true sources (e.g. CoalesceBatchesExec, ProjectionExec) pass
            }
            _ => return Ok(None),
        }
    }

Change the code to above @adriangb , thank you !

zhuqi-lucas and others added 3 commits July 3, 2025 10:45
…n.rs

Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a minor nit to reduce indentation but otherwise approved, great job!

zhuqi-lucas and others added 3 commits July 3, 2025 23:39
…n.rs

Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
@zhuqi-lucas
Copy link
Contributor Author

left a minor nit to reduce indentation but otherwise approved, great job!

Thank you @adriangb for patient review, addressed it in latest PR.

@adriangb
Copy link
Contributor

adriangb commented Jul 3, 2025

left a minor nit to reduce indentation but otherwise approved, great job!

Thank you @adriangb for patient review, addressed it in latest PR.

Btw, I think you mean latest commit.

@adriangb adriangb merged commit 06e5bbe into apache:main Jul 3, 2025
27 checks passed
@nuno-faria
Copy link
Contributor

Thanks @zhuqi-lucas and @adriangb.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants