-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Removed incorrect union check in enforce_sorting and updated tests #18661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3115,19 +3115,18 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti | |
| ) -> Result<()> { | ||
| assert_snapshot!( | ||
| union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, | ||
| @r#" | ||
| @r" | ||
| AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | ||
| SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | ||
| CoalescePartitionsExec | ||
| AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | ||
| UnionExec | ||
| DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | ||
| SortPreservingMergeExec: [id@0 ASC NULLS LAST] | ||
| AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | ||
| UnionExec | ||
| DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | ||
| SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great push down |
||
| DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | ||
| "#); | ||
| "); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[ignore] // See https://github.com/apache/datafusion/issues/18380 | ||
| #[tokio::test] | ||
| // Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting | ||
| async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true( | ||
|
|
@@ -3144,52 +3143,6 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti | |
| DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | ||
| "#); | ||
|
|
||
| // 💥 Doesn't pass, and generates this plan: | ||
| // | ||
| // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | ||
| // SortPreservingMergeExec: [id@0 ASC NULLS LAST] | ||
| // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] | ||
| // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | ||
| // UnionExec | ||
| // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | ||
| // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | ||
| // | ||
| // | ||
| // === Excerpt from the verbose explain === | ||
| // | ||
| // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | ||
| // | plan_type | plan | | ||
| // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | ||
| // | initial_physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | | ||
| // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | | ||
| // | | UnionExec | | ||
| // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | | ||
| // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | | ||
| // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | | ||
| // ... | ||
| // | physical_plan after EnforceDistribution | OutputRequirementExec: order_by=[], dist_by=Unspecified | | ||
| // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | | ||
| // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | | ||
| // | | CoalescePartitionsExec | | ||
| // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | | ||
| // | | UnionExec | | ||
| // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | | ||
| // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | | ||
| // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | | ||
| // | | | | ||
| // | physical_plan after CombinePartialFinalAggregate | SAME TEXT AS ABOVE | ||
| // | | | | ||
| // | physical_plan after EnforceSorting | OutputRequirementExec: order_by=[], dist_by=Unspecified | | ||
| // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | | ||
| // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | | ||
| // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] | | ||
| // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | | ||
| // | | UnionExec | | ||
| // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | | ||
| // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | | ||
| // ... | ||
| // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -664,21 +664,13 @@ async fn test_union_inputs_different_sorted7() -> Result<()> { | |
| // Union has unnecessarily fine ordering below it. We should be able to replace them with absolutely necessary ordering. | ||
| let test = EnforceSortingTest::new(physical_plan).with_repartition_sorts(true); | ||
| assert_snapshot!(test.run(), @r" | ||
| Input Plan: | ||
| Input / Optimized Plan: | ||
| SortPreservingMergeExec: [nullable_col@0 ASC] | ||
| UnionExec | ||
| SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false] | ||
| DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet | ||
| SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false] | ||
| DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet | ||
|
|
||
| Optimized Plan: | ||
| SortPreservingMergeExec: [nullable_col@0 ASC] | ||
| UnionExec | ||
| SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] | ||
| DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet | ||
| SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] | ||
| DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain why this is no longer needed? Because the input and output are now the same?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I was confused at first but this is what it means. |
||
| "); | ||
| // Union preserves the inputs ordering, and we should not change any of the SortExecs under UnionExec | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,7 +49,7 @@ use crate::enforce_sorting::sort_pushdown::{ | |
| use crate::output_requirements::OutputRequirementExec; | ||
| use crate::utils::{ | ||
| add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit, | ||
| is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, | ||
| is_repartition, is_sort, is_sort_preserving_merge, is_window, | ||
| }; | ||
| use crate::PhysicalOptimizerRule; | ||
|
|
||
|
|
@@ -516,10 +516,7 @@ pub fn ensure_sorting( | |
| ); | ||
| child = update_sort_ctx_children_data(child, true)?; | ||
| } | ||
| } else if physical_ordering.is_none() | ||
| || !plan.maintains_input_order()[idx] | ||
| || is_union(plan) | ||
| { | ||
| } else if physical_ordering.is_none() || !plan.maintains_input_order()[idx] { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wow. This is simple. All the tests pass and @rgehan's reproducer now also pass means this is likely the right work
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I was very confused as well when stepping through the logic. The only thing I could think of is if it was thought that UnionExec was never able to maintain input order but it is implemented for the operator. Interesting though since this is quoted from the documentation online about the operator: "UnionExec combines multiple inputs with the same schema by concatenating the partitions. It does not mix or copy data within or across partitions. Thus if the input partitions are sorted, the output partitions of the union are also sorted."
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very nice analysis
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I guess when implementing optimizer rules, it’s more common to be conservative than comprehensive if not sure, to avoid bugs. |
||
| // We have a `SortExec` whose effect may be neutralized by another | ||
| // order-imposing operator, remove this sort: | ||
| child = update_child_to_remove_unnecessary_sort(idx, child, plan)?; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed this plan carefully and it looks good to me