Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,11 @@ impl PhysicalOptimizerRule for EnforceSorting {
)
})
.data()?;

// Execute a top-down traversal to exploit sort push-down opportunities
// missed by the bottom-up traversal:
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
let adjusted = pushdown_sorts(sort_pushdown)?;

adjusted
.plan
.transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
Expand Down Expand Up @@ -403,7 +401,10 @@ fn analyze_immediate_sort_removal(
{
// Replace the sort with a sort-preserving merge:
let expr = LexOrdering::new(sort_exec.expr().to_vec());
Arc::new(SortPreservingMergeExec::new(expr, Arc::clone(sort_input))) as _
Arc::new(
SortPreservingMergeExec::new(expr, Arc::clone(sort_input))
.with_fetch(sort_exec.fetch()),
) as _
} else {
// Remove the sort:
node.children = node.children.swap_remove(0).children;
Expand Down Expand Up @@ -626,11 +627,12 @@ fn remove_corresponding_sort_from_sub_plan(
// If there is existing ordering, to preserve ordering use
// `SortPreservingMergeExec` instead of a `CoalescePartitionsExec`.
let plan = Arc::clone(&node.plan);
let fetch = plan.fetch();
let plan = if let Some(ordering) = plan.output_ordering() {
Arc::new(SortPreservingMergeExec::new(
LexOrdering::new(ordering.to_vec()),
plan,
)) as _
Arc::new(
SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)
.with_fetch(fetch),
) as _
} else {
Arc::new(CoalescePartitionsExec::new(plan)) as _
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,20 @@ fn plan_with_order_preserving_variants(
// Flag indicating that it is desirable to replace `CoalescePartitionsExec`s
// with `SortPreservingMergeExec`s:
is_spm_better: bool,
fetch: Option<usize>,
) -> Result<OrderPreservationContext> {
sort_input.children = sort_input
.children
.into_iter()
.map(|node| {
// Update descendants in the given tree if there is a connection:
if node.data {
plan_with_order_preserving_variants(node, is_spr_better, is_spm_better)
plan_with_order_preserving_variants(
node,
is_spr_better,
is_spm_better,
fetch,
)
} else {
Ok(node)
}
Expand All @@ -133,7 +139,8 @@ fn plan_with_order_preserving_variants(
if let Some(ordering) = child.output_ordering() {
// When the input of a `CoalescePartitionsExec` has an ordering,
// replace it with a `SortPreservingMergeExec` if appropriate:
let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child));
let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child))
.with_fetch(fetch);
sort_input.plan = Arc::new(spm) as _;
sort_input.children[0].data = true;
return Ok(sort_input);
Expand Down Expand Up @@ -252,6 +259,7 @@ pub(crate) fn replace_with_order_preserving_variants(
requirements.children.swap_remove(0),
is_spr_better || use_order_preserving_variant,
is_spm_better || use_order_preserving_variant,
requirements.plan.fetch(),
)?;

// If the alternate plan makes this sort unnecessary, accept the alternate:
Expand Down
10 changes: 7 additions & 3 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) {
for (child, requirement) in node.children.iter_mut().zip(reqs) {
child.data = ParentRequirements {
ordering_requirement: requirement,
fetch: None,
// If the parent has a fetch value, assign it to the children
// Or use the fetch value of the child.
fetch: child.plan.fetch(),
};
}
}
Expand Down Expand Up @@ -95,6 +97,7 @@ fn pushdown_sorts_helper(
.ordering_satisfy_requirement(&parent_reqs);

if is_sort(plan) {
let sort_fetch = plan.fetch();
let required_ordering = plan
.output_ordering()
.cloned()
Expand All @@ -103,7 +106,8 @@ fn pushdown_sorts_helper(
if !satisfy_parent {
// Make sure this `SortExec` satisfies parent requirements:
let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default();
let fetch = requirements.data.fetch;
// It's possible current plan (`SortExec`) has a fetch value.
let fetch = requirements.data.fetch.or(sort_fetch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to take min() instead of or()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't find a case that both requirement fetch and sort fetch are Some and requirement fetch is large than sort fetch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if there is not a case from the sql API, a plan can be constructed like:

Limit: 10
--Sort: fetch:5

IIUC, that will set sort fetch as 10, but that will be suboptimal. Do you think there is something wrong in taking their min?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks good BTW. I don't want to block merging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can address as a follow on PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks all, I opened a follow up PR: #14221

requirements = requirements.children.swap_remove(0);
requirements = add_sort_above(requirements, sort_reqs, fetch);
};
Expand All @@ -113,7 +117,7 @@ fn pushdown_sorts_helper(
if let Some(adjusted) =
pushdown_requirement_to_children(&child.plan, &required_ordering)?
{
let fetch = child.plan.fetch();
let fetch = sort_fetch.or_else(|| child.plan.fetch());
for (grand_child, order) in child.children.iter_mut().zip(adjusted) {
grand_child.data = ParentRequirements {
ordering_requirement: order,
Expand Down
7 changes: 6 additions & 1 deletion datafusion/sqllogictest/test_files/topk.slt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ select * from topk order by x desc limit 3;
8
5


query I
select * from (select * from topk limit 8) order by x limit 3;
----
0
1
2


statement ok
Expand Down
Loading