Skip to content

Commit 8a1e526

Browse files
authored
Add new tests demonstrating pipeline breaking can be fixed with the help of SortPreservingRepartitionExec (#6953)
1 parent 5991ae3 commit 8a1e526

File tree

1 file changed

+65
-0
lines changed

1 file changed

+65
-0
lines changed

datafusion/core/src/physical_optimizer/sort_enforcement.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2762,6 +2762,67 @@ mod tests {
27622762
Ok(())
27632763
}
27642764

2765+
#[tokio::test]
2766+
async fn test_with_lost_ordering_bounded() -> Result<()> {
2767+
let schema = create_test_schema3()?;
2768+
let sort_exprs = vec![sort_expr("a", &schema)];
2769+
let source = csv_exec_sorted(&schema, sort_exprs, false);
2770+
let repartition_rr = repartition_exec(source);
2771+
let repartition_hash = Arc::new(RepartitionExec::try_new(
2772+
repartition_rr,
2773+
Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
2774+
)?) as _;
2775+
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
2776+
let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);
2777+
2778+
let expected_input = vec![
2779+
"SortExec: expr=[a@0 ASC]",
2780+
" CoalescePartitionsExec",
2781+
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
2782+
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
2783+
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=false",
2784+
];
2785+
let expected_optimized = vec![
2786+
"SortPreservingMergeExec: [a@0 ASC]",
2787+
" SortExec: expr=[a@0 ASC]",
2788+
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
2789+
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
2790+
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=false",
2791+
];
2792+
assert_optimized!(expected_input, expected_optimized, physical_plan);
2793+
Ok(())
2794+
}
2795+
2796+
#[tokio::test]
2797+
async fn test_with_lost_ordering_unbounded() -> Result<()> {
2798+
let schema = create_test_schema3()?;
2799+
let sort_exprs = vec![sort_expr("a", &schema)];
2800+
let source = csv_exec_sorted(&schema, sort_exprs, true);
2801+
let repartition_rr = repartition_exec(source);
2802+
let repartition_hash = Arc::new(RepartitionExec::try_new(
2803+
repartition_rr,
2804+
Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
2805+
)?) as _;
2806+
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
2807+
let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);
2808+
2809+
let expected_input = vec![
2810+
"SortExec: expr=[a@0 ASC]",
2811+
" CoalescePartitionsExec",
2812+
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
2813+
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
2814+
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
2815+
];
2816+
let expected_optimized = vec![
2817+
"SortPreservingMergeExec: [a@0 ASC]",
2818+
" SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
2819+
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
2820+
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
2821+
];
2822+
assert_optimized!(expected_input, expected_optimized, physical_plan);
2823+
Ok(())
2824+
}
2825+
27652826
/// make PhysicalSortExpr with default options
27662827
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
27672828
sort_expr_options(name, schema, SortOptions::default())
@@ -2918,6 +2979,10 @@ mod tests {
29182979
)
29192980
}
29202981

2982+
fn coalesce_partitions_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
2983+
Arc::new(CoalescePartitionsExec::new(input))
2984+
}
2985+
29212986
fn aggregate_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
29222987
let schema = input.schema();
29232988
Arc::new(

0 commit comments

Comments
 (0)