Skip to content

Commit 81de7b8

Browse files
joroKr21lewiszlw
authored andcommitted
Fix CoalescePartitionsExec proto serialization (apache#15824) (#299)
* add fetch to CoalescePartitionsExecNode * gen proto code * Add test * fix * fix build * Fix test build * remove comments Co-authored-by: 张林伟 <lewiszlw520@gmail.com>
1 parent 557a079 commit 81de7b8

File tree

6 files changed

+51
-2
lines changed

6 files changed

+51
-2
lines changed

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ impl CoalescePartitionsExec {
5959
}
6060
}
6161

62+
/// Update fetch with the argument
63+
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
64+
self.fetch = fetch;
65+
self
66+
}
67+
6268
/// Input execution plan
6369
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
6470
&self.input

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,6 +1192,7 @@ message CoalesceBatchesExecNode {
11921192

11931193
message CoalescePartitionsExecNode {
11941194
PhysicalPlanNode input = 1;
1195+
optional uint32 fetch = 2;
11951196
}
11961197

11971198
message PhysicalHashRepartition {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,10 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
291291
PhysicalPlanType::Merge(merge) => {
292292
let input: Arc<dyn ExecutionPlan> =
293293
into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
294-
Ok(Arc::new(CoalescePartitionsExec::new(input)))
294+
Ok(Arc::new(
295+
CoalescePartitionsExec::new(input)
296+
.with_fetch(merge.fetch.map(|n| n as usize)),
297+
))
295298
}
296299
PhysicalPlanType::Repartition(repart) => {
297300
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
@@ -1671,6 +1674,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
16711674
physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
16721675
protobuf::CoalescePartitionsExecNode {
16731676
input: Some(Box::new(input)),
1677+
fetch: exec.fetch().map(|n| n as u32),
16741678
},
16751679
))),
16761680
});

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use datafusion::physical_plan::aggregates::{
5858
AggregateExec, AggregateMode, PhysicalGroupBy,
5959
};
6060
use datafusion::physical_plan::analyze::AnalyzeExec;
61+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
6162
use datafusion::physical_plan::empty::EmptyExec;
6263
use datafusion::physical_plan::expressions::{
6364
binary, cast, col, in_list, like, lit, BinaryExpr, Column, NotExpr, NthValue,
@@ -641,7 +642,7 @@ fn roundtrip_sort_preserve_partitioning() -> Result<()> {
641642
}
642643

643644
#[test]
644-
fn roundtrip_coalesce_with_fetch() -> Result<()> {
645+
fn roundtrip_coalesce_batches_with_fetch() -> Result<()> {
645646
let field_a = Field::new("a", DataType::Boolean, false);
646647
let field_b = Field::new("b", DataType::Int64, false);
647648
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
@@ -657,6 +658,22 @@ fn roundtrip_coalesce_with_fetch() -> Result<()> {
657658
))
658659
}
659660

661+
#[test]
662+
fn roundtrip_coalesce_partitions_with_fetch() -> Result<()> {
663+
let field_a = Field::new("a", DataType::Boolean, false);
664+
let field_b = Field::new("b", DataType::Int64, false);
665+
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
666+
667+
roundtrip_test(Arc::new(CoalescePartitionsExec::new(Arc::new(
668+
EmptyExec::new(schema.clone()),
669+
))))?;
670+
671+
roundtrip_test(Arc::new(
672+
CoalescePartitionsExec::new(Arc::new(EmptyExec::new(schema)))
673+
.with_fetch(Some(10)),
674+
))
675+
}
676+
660677
#[test]
661678
fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
662679
let scan_config = FileScanConfig {

0 commit comments

Comments
 (0)