Skip to content

Commit 1833093

Browse files
Removed incorrect union check in enforce_sorting and updated tests (#18661)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18380. - Closes #9898. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> There was an overly aggressive condition enforce_sorting rule was not handling UnionExec correctly. This conditions assumed that Unions did not maintain order causing SortExec nodes to be removed and then eventually added at a higher level, less efficiently. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> I removed this condition that now has changed the logic to properly take into account UnionExec's ability to maintain input ordering. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, previously failing tests were ignored and now are unignored and passing. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> No
1 parent dc6e7cf commit 1833093

File tree

3 files changed

+10
-68
lines changed

3 files changed

+10
-68
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -3135,19 +3135,18 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti
31353135
) -> Result<()> {
31363136
assert_snapshot!(
31373137
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?,
3138-
@r#"
3138+
@r"
31393139
AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
3140-
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
3141-
CoalescePartitionsExec
3142-
AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
3143-
UnionExec
3144-
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
3140+
SortPreservingMergeExec: [id@0 ASC NULLS LAST]
3141+
AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
3142+
UnionExec
3143+
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
3144+
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
31453145
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
3146-
"#);
3146+
");
31473147
Ok(())
31483148
}
31493149

3150-
#[ignore] // See https://github.com/apache/datafusion/issues/18380
31513150
#[tokio::test]
31523151
// Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting
31533152
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true(
@@ -3164,52 +3163,6 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti
31643163
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
31653164
"#);
31663165

3167-
// 💥 Doesn't pass, and generates this plan:
3168-
//
3169-
// AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
3170-
// SortPreservingMergeExec: [id@0 ASC NULLS LAST]
3171-
// SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
3172-
// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
3173-
// UnionExec
3174-
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
3175-
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
3176-
//
3177-
//
3178-
// === Excerpt from the verbose explain ===
3179-
//
3180-
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3181-
// | plan_type | plan |
3182-
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3183-
// | initial_physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
3184-
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
3185-
// | | UnionExec |
3186-
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
3187-
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
3188-
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
3189-
// ...
3190-
// | physical_plan after EnforceDistribution | OutputRequirementExec: order_by=[], dist_by=Unspecified |
3191-
// | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
3192-
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
3193-
// | | CoalescePartitionsExec |
3194-
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
3195-
// | | UnionExec |
3196-
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
3197-
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
3198-
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
3199-
// | | |
3200-
// | physical_plan after CombinePartialFinalAggregate | SAME TEXT AS ABOVE
3201-
// | | |
3202-
// | physical_plan after EnforceSorting | OutputRequirementExec: order_by=[], dist_by=Unspecified |
3203-
// | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
3204-
// | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] |
3205-
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] |
3206-
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] |
3207-
// | | UnionExec |
3208-
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
3209-
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
3210-
// ...
3211-
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3212-
32133166
Ok(())
32143167
}
32153168

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -664,21 +664,13 @@ async fn test_union_inputs_different_sorted7() -> Result<()> {
664664
// Union has unnecessarily fine ordering below it. We should be able to replace them with absolutely necessary ordering.
665665
let test = EnforceSortingTest::new(physical_plan).with_repartition_sorts(true);
666666
assert_snapshot!(test.run(), @r"
667-
Input Plan:
667+
Input / Optimized Plan:
668668
SortPreservingMergeExec: [nullable_col@0 ASC]
669669
UnionExec
670670
SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]
671671
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
672672
SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]
673673
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
674-
675-
Optimized Plan:
676-
SortPreservingMergeExec: [nullable_col@0 ASC]
677-
UnionExec
678-
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
679-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
680-
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
681-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
682674
");
683675
// Union preserves the inputs ordering, and we should not change any of the SortExecs under UnionExec
684676

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::enforce_sorting::sort_pushdown::{
4949
use crate::output_requirements::OutputRequirementExec;
5050
use crate::utils::{
5151
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
52-
is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
52+
is_repartition, is_sort, is_sort_preserving_merge, is_window,
5353
};
5454
use crate::PhysicalOptimizerRule;
5555

@@ -516,10 +516,7 @@ pub fn ensure_sorting(
516516
);
517517
child = update_sort_ctx_children_data(child, true)?;
518518
}
519-
} else if physical_ordering.is_none()
520-
|| !plan.maintains_input_order()[idx]
521-
|| is_union(plan)
522-
{
519+
} else if physical_ordering.is_none() || !plan.maintains_input_order()[idx] {
523520
// We have a `SortExec` whose effect may be neutralized by another
524521
// order-imposing operator, remove this sort:
525522
child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;

0 commit comments

Comments
 (0)