Skip to content

TopK Sort incorrectly pushed down past Join #16638

@nuno-faria

Description

@nuno-faria

Describe the bug

The SortExec physical operator is being incorrectly pushed down past a LEFT ANTI JOIN, returning incorrect results.
Disabling the EnforceSorting physical optimizer rule results in a correct plan.

To Reproduce

use datafusion::error::Result;
use datafusion::prelude::SessionContext;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();

    ctx.sql("create table t1 (k int, v int);")
        .await?
        .collect()
        .await?;
    ctx.sql("create table t2 (k int, v int);")
        .await?
        .collect()
        .await?;

    ctx.sql("insert into t1 select i as k, i as v from generate_series(1, 10000) t(i);")
        .await?
        .collect()
        .await?;
    ctx.sql("insert into t2 values (1, 1);")
        .await?
        .collect()
        .await?;

    let df = ctx
        .sql("select * from t1 left anti join t2 on t1.k = t2.k order by t1.k limit 2;")
        .await?;
    df.clone().explain(false, false)?.show().await?;
    df.show().await?;

    Ok(())
}

Which returns:

+---------------+----------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                   |
+---------------+----------------------------------------------------------------------------------------+
| logical_plan  | Sort: t1.k ASC NULLS LAST, fetch=2                                                     |
|               |   LeftAnti Join: t1.k = t2.k                                                           |
|               |     TableScan: t1 projection=[k, v]                                                    |
|               |     TableScan: t2 projection=[k]                                                       |
| physical_plan | SortPreservingMergeExec: [k@0 ASC NULLS LAST], fetch=2                                 |
|               |   CoalesceBatchesExec: target_batch_size=8192, fetch=2                                 |
|               |     HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)]               |
|               |       DataSourceExec: partitions=1, partition_sizes=[1]                                |
|               |       SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST], preserve_partitioning=[true] |
|               |         RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1          |
|               |           DataSourceExec: partitions=1, partition_sizes=[2]                            |
|               |                                                                                        |
+---------------+----------------------------------------------------------------------------------------+
+------+------+
| k    | v    |
+------+------+
| 2    | 2    |
| 8193 | 8193 |
+------+------+

After disabling the EnforceSorting rule:

+---------------+-----------------------------------------------------------------------------------+
| plan_type     | plan                                                                              |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan  | Sort: t1.k ASC NULLS LAST, fetch=2                                                |
|               |   LeftAnti Join: t1.k = t2.k                                                      |
|               |     TableScan: t1 projection=[k, v]                                               |
|               |     TableScan: t2 projection=[k]                                                  |
| physical_plan | SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false] |
|               |   CoalescePartitionsExec                                                          |
|               |     CoalesceBatchesExec: target_batch_size=8192                                   |
|               |       HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)]        |
|               |         DataSourceExec: partitions=1, partition_sizes=[1]                         |
|               |         RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1     |
|               |           DataSourceExec: partitions=1, partition_sizes=[2]                       |
|               |                                                                                   |
+---------------+-----------------------------------------------------------------------------------+
+---+---+
| k | v |
+---+---+
| 2 | 2 |
| 3 | 3 |
+---+---+

Expected behavior

Not push down the sort in this case.

Additional context

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions