Skip to content

Commit 97058f5

Browse files
joroKr21lewiszlw
authored andcommitted
Fix CoalescePartitionsExec proto serialization (apache#15824) (#299) v48
* add fetch to CoalescePartitionsExecNode * gen proto code * Add test * fix * fix build * Fix test build * remove comments Co-authored-by: 张林伟 <lewiszlw520@gmail.com>
1 parent de58674 commit 97058f5

File tree

6 files changed

+51
-2
lines changed

6 files changed

+51
-2
lines changed

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ impl CoalescePartitionsExec {
5959
}
6060
}
6161

62+
/// Update fetch with the argument
63+
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
64+
self.fetch = fetch;
65+
self
66+
}
67+
6268
/// Input execution plan
6369
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
6470
&self.input

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,6 +1188,7 @@ message CoalesceBatchesExecNode {
11881188

11891189
message CoalescePartitionsExecNode {
11901190
PhysicalPlanNode input = 1;
1191+
optional uint32 fetch = 2;
11911192
}
11921193

11931194
message PhysicalHashRepartition {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,10 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
291291
PhysicalPlanType::Merge(merge) => {
292292
let input: Arc<dyn ExecutionPlan> =
293293
into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
294-
Ok(Arc::new(CoalescePartitionsExec::new(input)))
294+
Ok(Arc::new(
295+
CoalescePartitionsExec::new(input)
296+
.with_fetch(merge.fetch.map(|n| n as usize)),
297+
))
295298
}
296299
PhysicalPlanType::Repartition(repart) => {
297300
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
@@ -1672,6 +1675,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
16721675
physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
16731676
protobuf::CoalescePartitionsExecNode {
16741677
input: Some(Box::new(input)),
1678+
fetch: exec.fetch().map(|n| n as u32),
16751679
},
16761680
))),
16771681
});

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ use datafusion::physical_plan::aggregates::{
6363
AggregateExec, AggregateMode, PhysicalGroupBy,
6464
};
6565
use datafusion::physical_plan::analyze::AnalyzeExec;
66+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
6667
use datafusion::physical_plan::empty::EmptyExec;
6768
use datafusion::physical_plan::expressions::{
6869
binary, cast, col, in_list, like, lit, BinaryExpr, Column, NotExpr, PhysicalSortExpr,
@@ -683,7 +684,7 @@ fn roundtrip_sort_preserve_partitioning() -> Result<()> {
683684
}
684685

685686
#[test]
686-
fn roundtrip_coalesce_with_fetch() -> Result<()> {
687+
fn roundtrip_coalesce_batches_with_fetch() -> Result<()> {
687688
let field_a = Field::new("a", DataType::Boolean, false);
688689
let field_b = Field::new("b", DataType::Int64, false);
689690
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
@@ -699,6 +700,22 @@ fn roundtrip_coalesce_with_fetch() -> Result<()> {
699700
))
700701
}
701702

703+
#[test]
704+
fn roundtrip_coalesce_partitions_with_fetch() -> Result<()> {
705+
let field_a = Field::new("a", DataType::Boolean, false);
706+
let field_b = Field::new("b", DataType::Int64, false);
707+
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
708+
709+
roundtrip_test(Arc::new(CoalescePartitionsExec::new(Arc::new(
710+
EmptyExec::new(schema.clone()),
711+
))))?;
712+
713+
roundtrip_test(Arc::new(
714+
CoalescePartitionsExec::new(Arc::new(EmptyExec::new(schema)))
715+
.with_fetch(Some(10)),
716+
))
717+
}
718+
702719
#[test]
703720
fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
704721
let scan_config = FileScanConfig {

0 commit comments

Comments
 (0)