@@ -98,8 +98,6 @@ struct EnforceSortingTest {
9898 repartition_sorts : bool ,
9999 /// If true, asserts that the input and optimized plans are the same
100100 expect_no_change : bool ,
101- /// A message printed into the snapshot to describe the expected output
102- expected_description : Option < String > ,
103101}
104102
105103impl EnforceSortingTest {
@@ -108,7 +106,6 @@ impl EnforceSortingTest {
108106 plan,
109107 repartition_sorts : false ,
110108 expect_no_change : false ,
111- expected_description : None ,
112109 }
113110 }
114111
@@ -124,11 +121,6 @@ impl EnforceSortingTest {
124121 self
125122 }
126123
127- /// Add an expected output description
128- fn with_expected_description ( mut self , description : & str ) -> Self {
129- self . expected_description = Some ( description. to_string ( ) ) ;
130- self
131- }
132124
133125 /// Runs the enforce sorting test and returns a string with the input and
134126 /// optimized plan as strings for snapshot comparison using insta
@@ -202,25 +194,18 @@ impl EnforceSortingTest {
202194 . indent ( true )
203195 . to_string ( ) ;
204196
205- let expected_description =
206- if let Some ( desc) = self . expected_description . as_deref ( ) {
207- format ! ( "{desc}\n " )
208- } else {
209- "" . to_string ( )
210- } ;
211-
212197 if self . expect_no_change {
213198 assert_eq ! ( input_plan_string, optimized_plan_string,
214199 "Expected no change in the plan, but the optimized plan differs from the input plan"
215200 ) ;
216201
217202 return format ! (
218- "{expected_description} Input / Optimized Plan:\n {input_plan_string}" ,
203+ "Input / Optimized Plan:\n {input_plan_string}" ,
219204 ) ;
220205 }
221206
222207 format ! (
223- "Input Plan:\n {input_plan_string}\n {expected_description}Optimized Plan:\n {optimized_plan_string}" ,
208+ "Input Plan:\n {input_plan_string}\n Optimized Plan:\n {optimized_plan_string}" ,
224209 )
225210 }
226211}
@@ -274,8 +259,8 @@ async fn test_do_not_remove_sort_with_limit() -> Result<()> {
274259 let physical_plan = sort_preserving_merge_exec ( ordering, repartition) ;
275260
276261 let test = EnforceSortingTest :: new ( physical_plan)
277- . with_repartition_sorts ( true )
278- . with_expected_description ( "// We should keep the bottom `SortExec`." ) ;
262+ . with_repartition_sorts ( true ) ;
263+
279264 assert_snapshot ! ( test. run( ) , @r"
280265 Input Plan:
281266 SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]
@@ -286,7 +271,6 @@ async fn test_do_not_remove_sort_with_limit() -> Result<()> {
286271 SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]
287272 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
288273
289- // We should keep the bottom `SortExec`.
290274 Optimized Plan:
291275 SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]
292276 SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[true]
@@ -297,7 +281,7 @@ async fn test_do_not_remove_sort_with_limit() -> Result<()> {
297281 SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]
298282 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
299283 " ) ;
300-
284+ // We should keep the bottom `SortExec`.
301285 Ok ( ( ) )
302286}
303287
@@ -314,18 +298,17 @@ async fn test_union_inputs_sorted() -> Result<()> {
314298 // one input to the union is already sorted, one is not.
315299 let test = EnforceSortingTest :: new ( physical_plan)
316300 . with_repartition_sorts ( true )
317- . with_expected_description ( "// should not add a sort at the output of the union, input plan should not be changed" )
318301 . with_expect_no_change ( true ) ;
319302
320303 assert_snapshot ! ( test. run( ) , @r"
321- // should not add a sort at the output of the union, input plan should not be changed
322304 Input / Optimized Plan:
323305 SortPreservingMergeExec: [nullable_col@0 ASC]
324306 UnionExec
325307 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet
326308 SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
327309 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
328310 " ) ;
311+ // should not add a sort at the output of the union, input plan should not be changed
329312
330313 Ok ( ( ) )
331314}
@@ -348,17 +331,16 @@ async fn test_union_inputs_different_sorted() -> Result<()> {
348331 // one input to the union is already sorted, one is not.
349332 let test = EnforceSortingTest :: new ( physical_plan)
350333 . with_repartition_sorts ( true )
351- . with_expected_description ( "// should not add a sort at the output of the union, input plan should not be changed" )
352334 . with_expect_no_change ( true ) ;
353335 assert_snapshot ! ( test. run( ) , @r"
354- // should not add a sort at the output of the union, input plan should not be changed
355336 Input / Optimized Plan:
356337 SortPreservingMergeExec: [nullable_col@0 ASC]
357338 UnionExec
358339 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], file_type=parquet
359340 SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
360341 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
361342 " ) ;
343+ // should not add a sort at the output of the union, input plan should not be changed
362344
363345 Ok ( ( ) )
364346}
@@ -422,10 +404,7 @@ async fn test_union_inputs_different_sorted3() -> Result<()> {
422404 // First input to the union is not Sorted (SortExec is finer than required ordering by the SortPreservingMergeExec above).
423405 // Second input to the union is already Sorted (matches with the required ordering by the SortPreservingMergeExec above).
424406 // Third input to the union is not Sorted (SortExec is matches required ordering by the SortPreservingMergeExec above).
425- let test = EnforceSortingTest :: new ( physical_plan) . with_repartition_sorts ( true )
426- . with_expected_description (
427- "// should adjust sorting in the first input of the union such that it is not unnecessarily fine"
428- ) ;
407+ let test = EnforceSortingTest :: new ( physical_plan) . with_repartition_sorts ( true ) ;
429408
430409 assert_snapshot ! ( test. run( ) , @r"
431410 Input Plan:
@@ -437,7 +416,6 @@ async fn test_union_inputs_different_sorted3() -> Result<()> {
437416 SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
438417 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
439418
440- // should adjust sorting in the first input of the union such that it is not unnecessarily fine
441419 Optimized Plan:
442420 SortPreservingMergeExec: [nullable_col@0 ASC]
443421 UnionExec
@@ -447,6 +425,7 @@ async fn test_union_inputs_different_sorted3() -> Result<()> {
447425 SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
448426 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
449427 " ) ;
428+ // should adjust sorting in the first input of the union such that it is not unnecessarily fine
450429 Ok ( ( ) )
451430}
452431
@@ -572,11 +551,7 @@ async fn test_union_inputs_different_sorted6() -> Result<()> {
572551 // The final plan should be valid AND the ordering of the third child
573552 // shouldn't be finer than necessary.
574553 let test = EnforceSortingTest :: new ( physical_plan)
575- . with_repartition_sorts ( true )
576- . with_expected_description (
577- "// Should adjust the requirement in the third input of the union so\n \
578- // that it is not unnecessarily fine.",
579- ) ;
554+ . with_repartition_sorts ( true ) ;
580555 assert_snapshot ! ( test. run( ) , @r"
581556 Input Plan:
582557 SortPreservingMergeExec: [nullable_col@0 ASC]
@@ -588,8 +563,6 @@ async fn test_union_inputs_different_sorted6() -> Result<()> {
588563 RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
589564 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
590565
591- // Should adjust the requirement in the third input of the union so
592- // that it is not unnecessarily fine.
593566 Optimized Plan:
594567 SortPreservingMergeExec: [nullable_col@0 ASC]
595568 UnionExec
@@ -600,6 +573,8 @@ async fn test_union_inputs_different_sorted6() -> Result<()> {
600573 RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
601574 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
602575 " ) ;
576+ // Should adjust the requirement in the third input of the union so
577+ // that it is not unnecessarily fine.
603578
604579 Ok ( ( ) )
605580}
@@ -620,8 +595,7 @@ async fn test_union_inputs_different_sorted7() -> Result<()> {
620595 let physical_plan = sort_preserving_merge_exec ( ordering2, union) ;
621596
622597 // Union has unnecessarily fine ordering below it. We should be able to replace them with absolutely necessary ordering.
623- let test = EnforceSortingTest :: new ( physical_plan) . with_repartition_sorts ( true )
624- . with_expected_description ( "// Union preserves the inputs ordering and we should not change any of the SortExecs under UnionExec" ) ;
598+ let test = EnforceSortingTest :: new ( physical_plan) . with_repartition_sorts ( true ) ;
625599 assert_snapshot ! ( test. run( ) , @r"
626600 Input Plan:
627601 SortPreservingMergeExec: [nullable_col@0 ASC]
@@ -631,7 +605,6 @@ async fn test_union_inputs_different_sorted7() -> Result<()> {
631605 SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]
632606 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
633607
634- // Union preserves the inputs ordering and we should not change any of the SortExecs under UnionExec
635608 Optimized Plan:
636609 SortPreservingMergeExec: [nullable_col@0 ASC]
637610 UnionExec
@@ -640,6 +613,7 @@ async fn test_union_inputs_different_sorted7() -> Result<()> {
640613 SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
641614 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
642615 " ) ;
616+ // Union preserves the inputs ordering, and we should not change any of the SortExecs under UnionExec
643617
644618 Ok ( ( ) )
645619}
@@ -679,11 +653,7 @@ async fn test_union_inputs_different_sorted8() -> Result<()> {
679653 // The `UnionExec` doesn't preserve any of the inputs ordering in the
680654 // example below.
681655 let test = EnforceSortingTest :: new ( physical_plan)
682- . with_repartition_sorts ( true )
683- . with_expected_description (
684- "// Since `UnionExec` doesn't preserve ordering in the plan above.\n \
685- // We shouldn't keep SortExecs in the plan.",
686- ) ;
656+ . with_repartition_sorts ( true ) ;
687657 assert_snapshot ! ( test. run( ) , @r"
688658 Input Plan:
689659 UnionExec
@@ -692,13 +662,13 @@ async fn test_union_inputs_different_sorted8() -> Result<()> {
692662 SortExec: expr=[nullable_col@0 DESC NULLS LAST, non_nullable_col@1 DESC NULLS LAST], preserve_partitioning=[false]
693663 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
694664
695- // Since `UnionExec` doesn't preserve ordering in the plan above.
696- // We shouldn't keep SortExecs in the plan.
697665 Optimized Plan:
698666 UnionExec
699667 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
700668 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
701669 " ) ;
670+ // Since `UnionExec` doesn't preserve ordering in the plan above.
671+ // We shouldn't keep SortExecs in the plan.
702672
703673 Ok ( ( ) )
704674}
@@ -1480,18 +1450,15 @@ async fn test_sort_merge_join_complex_order_by() -> Result<()> {
14801450 let physical_plan = sort_preserving_merge_exec ( ordering, join. clone ( ) ) ;
14811451
14821452 let test = EnforceSortingTest :: new ( physical_plan)
1483- . with_repartition_sorts ( true )
1484- . with_expected_description (
1485- "// can not push down the sort requirements, need to add SortExec" ,
1486- ) ;
1453+ . with_repartition_sorts ( true ) ;
1454+
14871455 assert_snapshot ! ( test. run( ) , @r"
14881456 Input Plan:
14891457 SortPreservingMergeExec: [col_b@3 ASC, col_a@2 ASC]
14901458 SortMergeJoin: join_type=Inner, on=[(nullable_col@0, col_a@0)]
14911459 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
14921460 DataSourceExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b], file_type=parquet
14931461
1494- // can not push down the sort requirements, need to add SortExec
14951462 Optimized Plan:
14961463 SortExec: expr=[col_b@3 ASC, nullable_col@0 ASC], preserve_partitioning=[false]
14971464 SortMergeJoin: join_type=Inner, on=[(nullable_col@0, col_a@0)]
@@ -1500,6 +1467,7 @@ async fn test_sort_merge_join_complex_order_by() -> Result<()> {
15001467 SortExec: expr=[col_a@0 ASC], preserve_partitioning=[false]
15011468 DataSourceExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b], file_type=parquet
15021469 " ) ;
1470+ // can not push down the sort requirements, need to add SortExec
15031471
15041472 // order by (nullable_col, col_b, col_a)
15051473 let ordering2 = [
@@ -1510,25 +1478,23 @@ async fn test_sort_merge_join_complex_order_by() -> Result<()> {
15101478 . into ( ) ;
15111479 let physical_plan = sort_preserving_merge_exec ( ordering2, join) ;
15121480 let test = EnforceSortingTest :: new ( physical_plan)
1513- . with_repartition_sorts ( true )
1514- . with_expected_description (
1515- "// Can push down the sort requirements since col_a = nullable_col" ,
1516- ) ;
1481+ . with_repartition_sorts ( true ) ;
1482+
15171483 assert_snapshot ! ( test. run( ) , @r"
15181484 Input Plan:
15191485 SortPreservingMergeExec: [nullable_col@0 ASC, col_b@3 ASC, col_a@2 ASC]
15201486 SortMergeJoin: join_type=Inner, on=[(nullable_col@0, col_a@0)]
15211487 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
15221488 DataSourceExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b], file_type=parquet
15231489
1524- // Can push down the sort requirements since col_a = nullable_col
15251490 Optimized Plan:
15261491 SortMergeJoin: join_type=Inner, on=[(nullable_col@0, col_a@0)]
15271492 SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
15281493 DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet
15291494 SortExec: expr=[col_a@0 ASC, col_b@1 ASC], preserve_partitioning=[false]
15301495 DataSourceExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b], file_type=parquet
15311496 " ) ;
1497+ // Can push down the sort requirements since col_a = nullable_col
15321498
15331499 Ok ( ( ) )
15341500}
@@ -2501,21 +2467,21 @@ async fn test_push_with_required_input_ordering_allowed() -> Result<()> {
25012467 ];
25022468 */
25032469 let test = EnforceSortingTest :: new ( plan. clone ( ) )
2504- . with_repartition_sorts ( true )
2505- . with_expected_description ( "// Should be able to push down" ) ;
2470+ . with_repartition_sorts ( true ) ;
2471+
25062472 assert_snapshot ! ( test. run( ) , @r"
25072473 Input Plan:
25082474 SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[false]
25092475 RequiredInputOrderingExec
25102476 SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
25112477 DataSourceExec: partitions=1, partition_sizes=[0]
25122478
2513- // Should be able to push down
25142479 Optimized Plan:
25152480 RequiredInputOrderingExec
25162481 SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[false]
25172482 DataSourceExec: partitions=1, partition_sizes=[0]
25182483 " ) ;
2484+ // Should be able to push down
25192485 Ok ( ( ) )
25202486}
25212487
@@ -3955,21 +3921,18 @@ fn test_removes_unused_orthogonal_sort() -> Result<()> {
39553921
39563922 // Test scenario/input has an orthogonal sort:
39573923 let test = EnforceSortingTest :: new ( output_sort)
3958- . with_repartition_sorts ( true )
3959- . with_expected_description (
3960- "// Test: should remove orthogonal sort, and the uppermost (unneeded) sort:" ,
3961- ) ;
3924+ . with_repartition_sorts ( true ) ;
39623925
39633926 assert_snapshot ! ( test. run( ) , @r"
39643927 Input Plan:
39653928 SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]
39663929 SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
39673930 StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]
39683931
3969- // Test: should remove orthogonal sort, and the uppermost (unneeded) sort:
39703932 Optimized Plan:
39713933 StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]
39723934 " ) ;
3935+ // Test: should remove orthogonal sort, and the uppermost (unneeded) sort:
39733936
39743937 Ok ( ( ) )
39753938}
@@ -3986,23 +3949,21 @@ fn test_keeps_used_orthogonal_sort() -> Result<()> {
39863949
39873950 // Test scenario/input has an orthogonal sort:
39883951 let test = EnforceSortingTest :: new ( output_sort)
3989- . with_repartition_sorts ( true )
3990- . with_expected_description (
3991- "// Test: should keep the orthogonal sort, since it modifies the output:" ,
3992- ) ;
3952+ . with_repartition_sorts ( true ) ;
39933953 assert_snapshot ! ( test. run( ) , @r"
39943954 Input Plan:
39953955 SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]
39963956 SortExec: TopK(fetch=3), expr=[a@0 ASC], preserve_partitioning=[false]
39973957 StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]
39983958
3999- // Test: should keep the orthogonal sort, since it modifies the output:
40003959 Optimized Plan:
40013960 SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]
40023961 SortExec: TopK(fetch=3), expr=[a@0 ASC], preserve_partitioning=[false]
40033962 StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]
40043963 " ) ;
40053964
3965+ // Test: should keep the orthogonal sort, since it modifies the output:
3966+
40063967 Ok ( ( ) )
40073968}
40083969
@@ -4022,8 +3983,7 @@ fn test_handles_multiple_orthogonal_sorts() -> Result<()> {
40223983 let output_sort = sort_exec ( input_ordering, orthogonal_sort_3) ; // final sort
40233984
40243985 // Test scenario/input has an orthogonal sort:
4025- let test = EnforceSortingTest :: new ( output_sort. clone ( ) ) . with_repartition_sorts ( true )
4026- . with_expected_description ( "// Test: should keep only the needed orthogonal sort, and remove the unneeded ones:" ) ;
3986+ let test = EnforceSortingTest :: new ( output_sort. clone ( ) ) . with_repartition_sorts ( true ) ;
40273987 assert_snapshot ! ( test. run( ) , @r"
40283988 Input Plan:
40293989 SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]
@@ -4033,12 +3993,13 @@ fn test_handles_multiple_orthogonal_sorts() -> Result<()> {
40333993 SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
40343994 StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]
40353995
4036- // Test: should keep only the needed orthogonal sort, and remove the unneeded ones:
40373996 Optimized Plan:
40383997 SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]
40393998 SortExec: TopK(fetch=3), expr=[a@0 ASC], preserve_partitioning=[false]
40403999 StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]
40414000 " ) ;
4001+
4002+ // Test: should keep only the needed orthogonal sort, and remove the unneeded ones:
40424003 Ok ( ( ) )
40434004}
40444005
0 commit comments