Skip to content

Commit 9619f02

Browse files
Calculate ordering equivalence for expressions (rather than just columns) (#8281)
* Complex exprs requirement support (#215) * Discover ordering of complex expressions in group by and window partition by * Remove unnecessary tests * Update comments * Minor changes * Better projection support complex expression support * Fix failing test * Simplifications * Simplifications * Add is end flag * Simplifications * Simplifications * Simplifications * Minor changes * Minor changes * Minor changes * All tests pass * Change implementation of find_longest_permutation * Minor changes * Minor changes * Remove projection section * Remove projection implementation * Fix linter errors * Remove projection sections * Minor changes * Add docstring comments * Add comments * Minor changes * Minor changes * Add comments * simplifications * Minor changes * Review Part 1 * Add new tests * Review Part 2 * Address review feedback * Remove error message check in the test --------- Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
1 parent 98f1bc0 commit 9619f02

File tree

6 files changed

+1155
-305
lines changed

6 files changed

+1155
-305
lines changed

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3787,7 +3787,7 @@ pub(crate) mod tests {
37873787
fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> {
37883788
let schema = schema();
37893789
let sort_key = vec![PhysicalSortExpr {
3790-
expr: col("c", &schema).unwrap(),
3790+
expr: col("a", &schema).unwrap(),
37913791
options: SortOptions::default(),
37923792
}];
37933793
let plan = sort_exec(
@@ -3804,9 +3804,9 @@ pub(crate) mod tests {
38043804
);
38053805

38063806
let expected = &[
3807-
"SortPreservingMergeExec: [c@2 ASC]",
3807+
"SortPreservingMergeExec: [a@0 ASC]",
38083808
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
3809-
"SortExec: expr=[c@2 ASC]",
3809+
"SortExec: expr=[a@0 ASC]",
38103810
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
38113811
"FilterExec: c@2 = 0",
38123812
// repartition is lowest down
@@ -3817,7 +3817,7 @@ pub(crate) mod tests {
38173817
assert_optimized!(expected, plan.clone(), true);
38183818

38193819
let expected_first_sort_enforcement = &[
3820-
"SortExec: expr=[c@2 ASC]",
3820+
"SortExec: expr=[a@0 ASC]",
38213821
"CoalescePartitionsExec",
38223822
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
38233823
"FilterExec: c@2 = 0",

datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -357,15 +357,19 @@ mod tests {
357357
let physical_plan =
358358
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
359359

360-
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
360+
let expected_input = [
361+
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
361362
" SortExec: expr=[a@0 ASC NULLS LAST]",
362363
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
363364
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
364-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
365-
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
365+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
366+
];
367+
let expected_optimized = [
368+
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
366369
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
367370
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
368-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
371+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
372+
];
369373
assert_optimized!(expected_input, expected_optimized, physical_plan);
370374
Ok(())
371375
}
@@ -434,19 +438,20 @@ mod tests {
434438
let physical_plan =
435439
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
436440

437-
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
441+
let expected_input = [
442+
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
438443
" SortExec: expr=[a@0 ASC NULLS LAST]",
439444
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
440445
" FilterExec: c@1 > 3",
441446
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
442-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
447+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
448+
];
443449
let expected_optimized = [
444450
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
445451
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
446452
" FilterExec: c@1 > 3",
447453
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
448454
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
449-
450455
];
451456
assert_optimized!(expected_input, expected_optimized, physical_plan);
452457
Ok(())
@@ -466,19 +471,23 @@ mod tests {
466471
let physical_plan =
467472
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
468473

469-
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
474+
let expected_input = [
475+
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
470476
" SortExec: expr=[a@0 ASC NULLS LAST]",
471477
" CoalesceBatchesExec: target_batch_size=8192",
472478
" FilterExec: c@1 > 3",
473479
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
474480
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
475-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
476-
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
481+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
482+
];
483+
let expected_optimized = [
484+
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
477485
" CoalesceBatchesExec: target_batch_size=8192",
478486
" FilterExec: c@1 > 3",
479487
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
480488
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
481-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
489+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
490+
];
482491
assert_optimized!(expected_input, expected_optimized, physical_plan);
483492
Ok(())
484493
}
@@ -499,21 +508,25 @@ mod tests {
499508
let physical_plan =
500509
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
501510

502-
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
511+
let expected_input = [
512+
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
503513
" SortExec: expr=[a@0 ASC NULLS LAST]",
504514
" CoalesceBatchesExec: target_batch_size=8192",
505515
" FilterExec: c@1 > 3",
506516
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
507517
" CoalesceBatchesExec: target_batch_size=8192",
508518
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
509-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
510-
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
519+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
520+
];
521+
let expected_optimized = [
522+
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
511523
" CoalesceBatchesExec: target_batch_size=8192",
512524
" FilterExec: c@1 > 3",
513525
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
514526
" CoalesceBatchesExec: target_batch_size=8192",
515527
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
516-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
528+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
529+
];
517530
assert_optimized!(expected_input, expected_optimized, physical_plan);
518531
Ok(())
519532
}
@@ -531,18 +544,22 @@ mod tests {
531544
let physical_plan: Arc<dyn ExecutionPlan> =
532545
coalesce_partitions_exec(coalesce_batches_exec);
533546

534-
let expected_input = ["CoalescePartitionsExec",
547+
let expected_input = [
548+
"CoalescePartitionsExec",
535549
" CoalesceBatchesExec: target_batch_size=8192",
536550
" FilterExec: c@1 > 3",
537551
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
538552
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
539-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
540-
let expected_optimized = ["CoalescePartitionsExec",
553+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
554+
];
555+
let expected_optimized = [
556+
"CoalescePartitionsExec",
541557
" CoalesceBatchesExec: target_batch_size=8192",
542558
" FilterExec: c@1 > 3",
543559
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
544560
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
545-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
561+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
562+
];
546563
assert_optimized!(expected_input, expected_optimized, physical_plan);
547564
Ok(())
548565
}
@@ -570,7 +587,7 @@ mod tests {
570587
" FilterExec: c@1 > 3",
571588
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
572589
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
573-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"
590+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
574591
];
575592
let expected_optimized = [
576593
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -603,16 +620,20 @@ mod tests {
603620
sort,
604621
);
605622

606-
let expected_input = ["SortPreservingMergeExec: [c@1 ASC]",
623+
let expected_input = [
624+
"SortPreservingMergeExec: [c@1 ASC]",
607625
" SortExec: expr=[c@1 ASC]",
608626
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
609627
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
610-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
611-
let expected_optimized = ["SortPreservingMergeExec: [c@1 ASC]",
628+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
629+
];
630+
let expected_optimized = [
631+
"SortPreservingMergeExec: [c@1 ASC]",
612632
" SortExec: expr=[c@1 ASC]",
613633
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
614634
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
615-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
635+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
636+
];
616637
assert_optimized!(expected_input, expected_optimized, physical_plan);
617638
Ok(())
618639
}
@@ -628,15 +649,19 @@ mod tests {
628649
let physical_plan =
629650
sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions, false);
630651

631-
let expected_input = ["SortExec: expr=[a@0 ASC NULLS LAST]",
652+
let expected_input = [
653+
"SortExec: expr=[a@0 ASC NULLS LAST]",
632654
" CoalescePartitionsExec",
633655
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
634656
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
635-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
636-
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
657+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
658+
];
659+
let expected_optimized = [
660+
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
637661
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
638662
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
639-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
663+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
664+
];
640665
assert_optimized!(expected_input, expected_optimized, physical_plan);
641666
Ok(())
642667
}
@@ -766,15 +791,19 @@ mod tests {
766791
let physical_plan =
767792
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
768793

769-
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
794+
let expected_input = [
795+
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
770796
" SortExec: expr=[a@0 ASC NULLS LAST]",
771797
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
772798
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
773-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
774-
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
799+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
800+
];
801+
let expected_optimized = [
802+
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
775803
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
776804
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
777-
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
805+
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
806+
];
778807
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
779808
Ok(())
780809
}

0 commit comments

Comments
 (0)