-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
The SortExec physical operator is being incorrectly pushed down past a LEFT ANTI JOIN, returning incorrect results.
Disabling the EnforceSorting physical optimizer rule results in a correct plan.
To Reproduce
use datafusion::error::Result;
use datafusion::prelude::SessionContext;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.sql("create table t1 (k int, v int);")
.await?
.collect()
.await?;
ctx.sql("create table t2 (k int, v int);")
.await?
.collect()
.await?;
ctx.sql("insert into t1 select i as k, i as v from generate_series(1, 10000) t(i);")
.await?
.collect()
.await?;
ctx.sql("insert into t2 values (1, 1);")
.await?
.collect()
.await?;
let df = ctx
.sql("select * from t1 left anti join t2 on t1.k = t2.k order by t1.k limit 2;")
.await?;
df.clone().explain(false, false)?.show().await?;
df.show().await?;
Ok(())
}Which returns:
+---------------+----------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------+
| logical_plan | Sort: t1.k ASC NULLS LAST, fetch=2 |
| | LeftAnti Join: t1.k = t2.k |
| | TableScan: t1 projection=[k, v] |
| | TableScan: t2 projection=[k] |
| physical_plan | SortPreservingMergeExec: [k@0 ASC NULLS LAST], fetch=2 |
| | CoalesceBatchesExec: target_batch_size=8192, fetch=2 |
| | HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST], preserve_partitioning=[true] |
| | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1 |
| | DataSourceExec: partitions=1, partition_sizes=[2] |
| | |
+---------------+----------------------------------------------------------------------------------------+
+------+------+
| k | v |
+------+------+
| 2 | 2 |
| 8193 | 8193 |
+------+------+
After disabling the EnforceSorting rule:
+---------------+-----------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan | Sort: t1.k ASC NULLS LAST, fetch=2 |
| | LeftAnti Join: t1.k = t2.k |
| | TableScan: t1 projection=[k, v] |
| | TableScan: t2 projection=[k] |
| physical_plan | SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false] |
| | CoalescePartitionsExec |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1 |
| | DataSourceExec: partitions=1, partition_sizes=[2] |
| | |
+---------------+-----------------------------------------------------------------------------------+
+---+---+
| k | v |
+---+---+
| 2 | 2 |
| 3 | 3 |
+---+---+Expected behavior
Not push down the sort in this case.
Additional context
- Tested from main (7cdac33)
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working