Skip to content

Commit 5f22722

Browse files
authored
chore: update Repartition DisplayAs to indicate maintained sort order (apache#18673)
Previously, it was not obvious reading the plan diagram when a Repartition operator maintained sortedness by virtue of having a single input partition even if preserve_sort order was false. This commit makes the implicit sortedness preservation explicit in the plan diagram. This commit does not change anything for the case when preserve sort order is false. ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes apache#18594 ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes ## Are there any user-facing changes? No
1 parent c8d26ba commit 5f22722

24 files changed

+272
-268
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2869,7 +2869,7 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
28692869

28702870
assert_snapshot!(
28712871
pretty_format_batches(&sql_results).unwrap(),
2872-
@r###"
2872+
@r"
28732873
+---------------+------------------------------------------------------------------------------------------------------------+
28742874
| plan_type | plan |
28752875
+---------------+------------------------------------------------------------------------------------------------------------+
@@ -2889,12 +2889,12 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
28892889
| | DataSourceExec: partitions=1, partition_sizes=[1] |
28902890
| | |
28912891
+---------------+------------------------------------------------------------------------------------------------------------+
2892-
"###
2892+
"
28932893
);
28942894

28952895
assert_snapshot!(
28962896
pretty_format_batches(&df_results).unwrap(),
2897-
@r###"
2897+
@r"
28982898
+---------------+----------------------------------------------------------------------------+
28992899
| plan_type | plan |
29002900
+---------------+----------------------------------------------------------------------------+
@@ -2910,7 +2910,7 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
29102910
| | DataSourceExec: partitions=1, partition_sizes=[1] |
29112911
| | |
29122912
+---------------+----------------------------------------------------------------------------+
2913-
"###
2913+
"
29142914
);
29152915
Ok(())
29162916
}

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 106 additions & 106 deletions
Large diffs are not rendered by default.

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1623,13 +1623,13 @@ async fn test_with_lost_ordering_unbounded() -> Result<()> {
16231623
SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
16241624
CoalescePartitionsExec
16251625
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
1626-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1626+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
16271627
StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]
16281628
16291629
Optimized Plan:
16301630
SortPreservingMergeExec: [a@0 ASC]
16311631
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC
1632-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1632+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
16331633
StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]
16341634
");
16351635

@@ -1641,13 +1641,13 @@ async fn test_with_lost_ordering_unbounded() -> Result<()> {
16411641
SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
16421642
CoalescePartitionsExec
16431643
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
1644-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1644+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
16451645
StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]
16461646
16471647
Optimized Plan:
16481648
SortPreservingMergeExec: [a@0 ASC]
16491649
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC
1650-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1650+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
16511651
StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]
16521652
");
16531653

@@ -1666,7 +1666,7 @@ async fn test_with_lost_ordering_bounded() -> Result<()> {
16661666
SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
16671667
CoalescePartitionsExec
16681668
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
1669-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1669+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
16701670
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false
16711671
");
16721672

@@ -1678,14 +1678,14 @@ async fn test_with_lost_ordering_bounded() -> Result<()> {
16781678
SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
16791679
CoalescePartitionsExec
16801680
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
1681-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1681+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
16821682
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false
16831683
16841684
Optimized Plan:
16851685
SortPreservingMergeExec: [a@0 ASC]
16861686
SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
16871687
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
1688-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1688+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
16891689
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=csv, has_header=false
16901690
");
16911691

@@ -1707,7 +1707,7 @@ async fn test_do_not_pushdown_through_spm() -> Result<()> {
17071707
Input / Optimized Plan:
17081708
SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
17091709
SortPreservingMergeExec: [a@0 ASC, b@1 ASC]
1710-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1710+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
17111711
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false
17121712
");
17131713

@@ -1736,13 +1736,13 @@ async fn test_pushdown_through_spm() -> Result<()> {
17361736
Input Plan:
17371737
SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[false]
17381738
SortPreservingMergeExec: [a@0 ASC, b@1 ASC]
1739-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1739+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
17401740
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false
17411741
17421742
Optimized Plan:
17431743
SortPreservingMergeExec: [a@0 ASC, b@1 ASC]
17441744
SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[true]
1745-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1745+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
17461746
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=csv, has_header=false
17471747
");
17481748
Ok(())
@@ -1766,7 +1766,7 @@ async fn test_window_multi_layer_requirement() -> Result<()> {
17661766
BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
17671767
SortPreservingMergeExec: [a@0 ASC, b@1 ASC]
17681768
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC, b@1 ASC
1769-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1769+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
17701770
SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[false]
17711771
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
17721772
@@ -1961,7 +1961,7 @@ async fn test_remove_unnecessary_sort2() -> Result<()> {
19611961
assert_snapshot!(test.run(), @r"
19621962
Input Plan:
19631963
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10
1964-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
1964+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
19651965
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
19661966
SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]
19671967
SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]
@@ -2008,7 +2008,7 @@ async fn test_remove_unnecessary_sort3() -> Result<()> {
20082008
AggregateExec: mode=Final, gby=[], aggr=[]
20092009
SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]
20102010
SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[true]
2011-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
2011+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
20122012
SortPreservingMergeExec: [non_nullable_col@1 ASC]
20132013
SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]
20142014
DataSourceExec: partitions=1, partition_sizes=[0]
@@ -2357,7 +2357,7 @@ async fn test_commutativity() -> Result<()> {
23572357

23582358
assert_snapshot!(displayable(orig_plan.as_ref()).indent(true), @r#"
23592359
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
2360-
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
2360+
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
23612361
BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
23622362
DataSourceExec: partitions=1, partition_sizes=[0]
23632363
"#);

0 commit comments

Comments
 (0)