Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 7 additions & 54 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3115,19 +3115,18 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti
) -> Result<()> {
assert_snapshot!(
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?,
@r#"
@r"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed this plan carefully and it looks good to me

AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
UnionExec
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
SortPreservingMergeExec: [id@0 ASC NULLS LAST]
AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
UnionExec
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great push down

DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
"#);
");
Ok(())
}

#[ignore] // See https://github.com/apache/datafusion/issues/18380
#[tokio::test]
// Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true(
Expand All @@ -3144,52 +3143,6 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
"#);

// 💥 Doesn't pass, and generates this plan:
//
// AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
// SortPreservingMergeExec: [id@0 ASC NULLS LAST]
// SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
// UnionExec
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
//
//
// === Excerpt from the verbose explain ===
//
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | plan_type | plan |
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | initial_physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
// | | UnionExec |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
// ...
// | physical_plan after EnforceDistribution | OutputRequirementExec: order_by=[], dist_by=Unspecified |
// | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
// | | CoalescePartitionsExec |
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
// | | UnionExec |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
// | | |
// | physical_plan after CombinePartialFinalAggregate | SAME TEXT AS ABOVE
// | | |
// | physical_plan after EnforceSorting | OutputRequirementExec: order_by=[], dist_by=Unspecified |
// | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
// | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] |
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] |
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] |
// | | UnionExec |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
// ...
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Ok(())
}

Expand Down
10 changes: 1 addition & 9 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,21 +664,13 @@ async fn test_union_inputs_different_sorted7() -> Result<()> {
// Union has unnecessarily fine ordering below it. We should be able to replace them with absolutely necessary ordering.
let test = EnforceSortingTest::new(physical_plan).with_repartition_sorts(true);
assert_snapshot!(test.run(), @r"
Input Plan:
Input / Optimized Plan:
SortPreservingMergeExec: [nullable_col@0 ASC]
UnionExec
SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet

Optimized Plan:
SortPreservingMergeExec: [nullable_col@0 ASC]
UnionExec
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
Copy link
Contributor

@NGA-TRAN NGA-TRAN Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is no longer needed? Because the input and output are now the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was confused at first but this is what it means.

");
// Union preserves the inputs ordering, and we should not change any of the SortExecs under UnionExec

Expand Down
7 changes: 2 additions & 5 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::enforce_sorting::sort_pushdown::{
use crate::output_requirements::OutputRequirementExec;
use crate::utils::{
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
is_repartition, is_sort, is_sort_preserving_merge, is_window,
};
use crate::PhysicalOptimizerRule;

Expand Down Expand Up @@ -516,10 +516,7 @@ pub fn ensure_sorting(
);
child = update_sort_ctx_children_data(child, true)?;
}
} else if physical_ordering.is_none()
|| !plan.maintains_input_order()[idx]
|| is_union(plan)
{
} else if physical_ordering.is_none() || !plan.maintains_input_order()[idx] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow. This is simple.
I wonder why this is_union is here in the first place 🤔

All the tests pass and @rgehan's reproducer now also pass means this is likely the right work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I was very confused as well when stepping through the logic. The only thing I could think of is if it was thought that UnionExec was never able to maintain input order but it is implemented for the operator.

Interesting though since this is quoted from the documentation online about the operator: "UnionExec combines multiple inputs with the same schema by concatenating the partitions. It does not mix or copy data within or across partitions. Thus if the input partitions are sorted, the output partitions of the union are also sorted."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice analysis

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow. This is simple. I wonder why this is_union is here in the first place 🤔

I guess when implementing optimizer rules, it’s more common to be conservative than comprehensive if not sure, to avoid bugs.

// We have a `SortExec` whose effect may be neutralized by another
// order-imposing operator, remove this sort:
child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
Expand Down