Skip to content

Commit 07780c1

Browse files
xudong963Nirnay Roy
authored andcommitted
Fix: fetch is lost in replace_order_preserving_variants method during EnforceDistribution (apache#15808)
1 parent 64a5146 commit 07780c1

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use datafusion_physical_plan::aggregates::{
5252
AggregateExec, AggregateMode, PhysicalGroupBy,
5353
};
5454
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
55+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
5556
use datafusion_physical_plan::execution_plan::ExecutionPlan;
5657
use datafusion_physical_plan::expressions::col;
5758
use datafusion_physical_plan::filter::FilterExec;
@@ -3471,3 +3472,47 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> {
34713472

34723473
Ok(())
34733474
}
3475+
3476+
#[test]
3477+
fn test_replace_order_preserving_variants_with_fetch() -> Result<()> {
3478+
// Create a base plan
3479+
let parquet_exec = parquet_exec();
3480+
3481+
let sort_expr = PhysicalSortExpr {
3482+
expr: Arc::new(Column::new("id", 0)),
3483+
options: SortOptions::default(),
3484+
};
3485+
3486+
let ordering = LexOrdering::new(vec![sort_expr]);
3487+
3488+
// Create a SortPreservingMergeExec with fetch=5
3489+
let spm_exec = Arc::new(
3490+
SortPreservingMergeExec::new(ordering, parquet_exec.clone()).with_fetch(Some(5)),
3491+
);
3492+
3493+
// Create distribution context
3494+
let dist_context = DistributionContext::new(
3495+
spm_exec,
3496+
true,
3497+
vec![DistributionContext::new(parquet_exec, false, vec![])],
3498+
);
3499+
3500+
// Apply the function
3501+
let result = replace_order_preserving_variants(dist_context)?;
3502+
3503+
// Verify the plan was transformed to CoalescePartitionsExec
3504+
result
3505+
.plan
3506+
.as_any()
3507+
.downcast_ref::<CoalescePartitionsExec>()
3508+
.expect("Expected CoalescePartitionsExec");
3509+
3510+
// Verify fetch was preserved
3511+
assert_eq!(
3512+
result.plan.fetch(),
3513+
Some(5),
3514+
"Fetch value was not preserved after transformation"
3515+
);
3516+
3517+
Ok(())
3518+
}

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,7 +1018,7 @@ fn remove_dist_changing_operators(
10181018
/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
10191019
/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
10201020
/// ```
1021-
fn replace_order_preserving_variants(
1021+
pub fn replace_order_preserving_variants(
10221022
mut context: DistributionContext,
10231023
) -> Result<DistributionContext> {
10241024
context.children = context
@@ -1035,7 +1035,10 @@ fn replace_order_preserving_variants(
10351035

10361036
if is_sort_preserving_merge(&context.plan) {
10371037
let child_plan = Arc::clone(&context.children[0].plan);
1038-
context.plan = Arc::new(CoalescePartitionsExec::new(child_plan));
1038+
// It's safe to unwrap because `CoalescePartitionsExec` supports `fetch`.
1039+
context.plan = CoalescePartitionsExec::new(child_plan)
1040+
.with_fetch(context.plan.fetch())
1041+
.unwrap();
10391042
return Ok(context);
10401043
} else if let Some(repartition) =
10411044
context.plan.as_any().downcast_ref::<RepartitionExec>()

0 commit comments

Comments
 (0)