Skip to content

Commit 23f00cf

Browse files
xudong963avantgardnerio
authored andcommitted
Fix spm + limit (apache#14569) v46
1 parent e3c58e5 commit 23f00cf

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,13 @@ fn ensure_sorting(
374374
{
375375
// This `SortPreservingMergeExec` is unnecessary, input already has a
376376
// single partition.
377-
let child_node = requirements.children.swap_remove(0);
377+
// single partition and no fetch is required.
378+
let mut child_node = requirements.children.swap_remove(0);
379+
if let Some(fetch) = plan.fetch() {
380+
// Add the limit exec if the spm has a fetch
381+
child_node.plan =
382+
Arc::new(LocalLimitExec::new(Arc::clone(&child_node.plan), fetch));
383+
}
378384
return Ok(Transformed::yes(child_node));
379385
}
380386

@@ -660,6 +666,7 @@ mod tests {
660666
sort_preserving_merge_exec, spr_repartition_exec, union_exec,
661667
RequirementsTestExec,
662668
};
669+
use crate::physical_optimizer::utils::sort_preserving_merge_exec_with_fetch;
663670
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
664671
use crate::prelude::{SessionConfig, SessionContext};
665672
use crate::test::{csv_exec_ordered, csv_exec_sorted, stream_exec_ordered};
@@ -1399,6 +1406,30 @@ mod tests {
13991406
Ok(())
14001407
}
14011408

1409+
#[tokio::test]
1410+
async fn test_remove_unnecessary_spm2() -> Result<()> {
1411+
let schema = create_test_schema()?;
1412+
let source = memory_exec(&schema);
1413+
let input = sort_preserving_merge_exec_with_fetch(
1414+
vec![sort_expr("non_nullable_col", &schema)],
1415+
source,
1416+
100,
1417+
);
1418+
1419+
let expected_input = [
1420+
"SortPreservingMergeExec: [non_nullable_col@1 ASC], fetch=100",
1421+
" MemoryExec: partitions=1, partition_sizes=[0]",
1422+
];
1423+
let expected_optimized = [
1424+
"LocalLimitExec: fetch=100",
1425+
" SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
1426+
" MemoryExec: partitions=1, partition_sizes=[0]",
1427+
];
1428+
assert_optimized!(expected_input, expected_optimized, input, true);
1429+
1430+
Ok(())
1431+
}
1432+
14021433
#[tokio::test]
14031434
async fn test_union_inputs_different_sorted2() -> Result<()> {
14041435
let schema = create_test_schema()?;

datafusion/core/src/physical_optimizer/utils.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use crate::physical_plan::union::UnionExec;
2727
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
2828
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
2929

30-
use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement};
30+
use datafusion_physical_expr::{
31+
LexRequirement, PhysicalSortExpr, PhysicalSortRequirement,
32+
};
3133
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
3234
use datafusion_physical_plan::tree_node::PlanContext;
3335

@@ -52,6 +54,15 @@ pub fn add_sort_above<T: Clone + Default>(
5254
PlanContext::new(Arc::new(new_sort), T::default(), vec![node])
5355
}
5456

57+
pub fn sort_preserving_merge_exec_with_fetch(
58+
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
59+
input: Arc<dyn ExecutionPlan>,
60+
fetch: usize,
61+
) -> Arc<dyn ExecutionPlan> {
62+
let sort_exprs = sort_exprs.into_iter().collect();
63+
Arc::new(SortPreservingMergeExec::new(sort_exprs, input).with_fetch(Some(fetch)))
64+
}
65+
5566
/// This utility function adds a `SortExec` above an operator according to the
5667
/// given ordering requirements while preserving the original partitioning. If
5768
/// requirement is already satisfied no `SortExec` is added.

0 commit comments

Comments
 (0)