Skip to content

Commit e2986f1

Browse files
authored
support inter leave node (#8460)
1 parent a8d74a7 commit e2986f1

File tree

5 files changed

+202
-26
lines changed

5 files changed

+202
-26
lines changed

datafusion/proto/proto/datafusion.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,7 @@ message PhysicalPlanNode {
11631163
AnalyzeExecNode analyze = 23;
11641164
JsonSinkExecNode json_sink = 24;
11651165
SymmetricHashJoinExecNode symmetric_hash_join = 25;
1166+
InterleaveExecNode interleave = 26;
11661167
}
11671168
}
11681169

@@ -1456,6 +1457,10 @@ message SymmetricHashJoinExecNode {
14561457
JoinFilter filter = 8;
14571458
}
14581459

1460+
message InterleaveExecNode {
1461+
repeated PhysicalPlanNode inputs = 1;
1462+
}
1463+
14591464
message UnionExecNode {
14601465
repeated PhysicalPlanNode inputs = 1;
14611466
}

datafusion/proto/src/generated/pbjson.rs

Lines changed: 104 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 9 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use datafusion::physical_plan::projection::ProjectionExec;
4848
use datafusion::physical_plan::repartition::RepartitionExec;
4949
use datafusion::physical_plan::sorts::sort::SortExec;
5050
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
51-
use datafusion::physical_plan::union::UnionExec;
51+
use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
5252
use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
5353
use datafusion::physical_plan::{
5454
udaf, AggregateExpr, ExecutionPlan, InputOrderMode, Partitioning, PhysicalExpr,
@@ -545,7 +545,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
545545
f.expression.as_ref().ok_or_else(|| {
546546
proto_error("Unexpected empty filter expression")
547547
})?,
548-
registry, &schema
548+
registry, &schema,
549549
)?;
550550
let column_indices = f.column_indices
551551
.iter()
@@ -556,7 +556,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
556556
i.side))
557557
)?;
558558

559-
Ok(ColumnIndex{
559+
Ok(ColumnIndex {
560560
index: i.index as usize,
561561
side: side.into(),
562562
})
@@ -634,7 +634,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
634634
f.expression.as_ref().ok_or_else(|| {
635635
proto_error("Unexpected empty filter expression")
636636
})?,
637-
registry, &schema
637+
registry, &schema,
638638
)?;
639639
let column_indices = f.column_indices
640640
.iter()
@@ -645,7 +645,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
645645
i.side))
646646
)?;
647647

648-
Ok(ColumnIndex{
648+
Ok(ColumnIndex {
649649
index: i.index as usize,
650650
side: side.into(),
651651
})
@@ -693,6 +693,17 @@ impl AsExecutionPlan for PhysicalPlanNode {
693693
}
694694
Ok(Arc::new(UnionExec::new(inputs)))
695695
}
696+
PhysicalPlanType::Interleave(interleave) => {
697+
let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
698+
for input in &interleave.inputs {
699+
inputs.push(input.try_into_physical_plan(
700+
registry,
701+
runtime,
702+
extension_codec,
703+
)?);
704+
}
705+
Ok(Arc::new(InterleaveExec::try_new(inputs)?))
706+
}
696707
PhysicalPlanType::CrossJoin(crossjoin) => {
697708
let left: Arc<dyn ExecutionPlan> = into_physical_plan(
698709
&crossjoin.left,
@@ -735,7 +746,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
735746
})?
736747
.as_ref();
737748
Ok(PhysicalSortExpr {
738-
expr: parse_physical_expr(expr,registry, input.schema().as_ref())?,
749+
expr: parse_physical_expr(expr, registry, input.schema().as_ref())?,
739750
options: SortOptions {
740751
descending: !sort_expr.asc,
741752
nulls_first: sort_expr.nulls_first,
@@ -782,7 +793,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
782793
})?
783794
.as_ref();
784795
Ok(PhysicalSortExpr {
785-
expr: parse_physical_expr(expr,registry, input.schema().as_ref())?,
796+
expr: parse_physical_expr(expr, registry, input.schema().as_ref())?,
786797
options: SortOptions {
787798
descending: !sort_expr.asc,
788799
nulls_first: sort_expr.nulls_first,
@@ -845,7 +856,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
845856
f.expression.as_ref().ok_or_else(|| {
846857
proto_error("Unexpected empty filter expression")
847858
})?,
848-
registry, &schema
859+
registry, &schema,
849860
)?;
850861
let column_indices = f.column_indices
851862
.iter()
@@ -856,7 +867,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
856867
i.side))
857868
)?;
858869

859-
Ok(ColumnIndex{
870+
Ok(ColumnIndex {
860871
index: i.index as usize,
861872
side: side.into(),
862873
})
@@ -1463,6 +1474,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
14631474
});
14641475
}
14651476

1477+
if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
1478+
let mut inputs: Vec<PhysicalPlanNode> = vec![];
1479+
for input in interleave.inputs() {
1480+
inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
1481+
input.to_owned(),
1482+
extension_codec,
1483+
)?);
1484+
}
1485+
return Ok(protobuf::PhysicalPlanNode {
1486+
physical_plan_type: Some(PhysicalPlanType::Interleave(
1487+
protobuf::InterleaveExecNode { inputs },
1488+
)),
1489+
});
1490+
}
1491+
14661492
if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
14671493
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
14681494
exec.input().to_owned(),

0 commit comments

Comments
 (0)