Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ message PhysicalPlanNode {
UnresolvedShuffleExecNode unresolved = 15;
RepartitionExecNode repartition = 16;
WindowAggExecNode window = 17;
ShuffleWriterExecNode shuffle_writer = 18;
}
}

Expand Down Expand Up @@ -629,6 +630,15 @@ message HashAggregateExecNode {
Schema input_schema = 7;
}

message ShuffleWriterExecNode {
//TODO it seems redundant to provide job and stage id here since we also have them
// in the TaskDefinition that wraps this plan
string job_id = 1;
uint32 stage_id = 2;
PhysicalPlanNode input = 3;
PhysicalHashRepartition output_partitioning = 4;
}

message ShuffleReaderExecNode {
repeated ShuffleReaderPartition partition = 1;
Schema schema = 2;
Expand Down
7 changes: 5 additions & 2 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ impl ExecutionPlan for ShuffleWriterExec {
}

fn output_partitioning(&self) -> Partitioning {
self.plan.output_partitioning()
match &self.shuffle_output_partitioning {
Some(p) => p.clone(),
_ => Partitioning::UnknownPartitioning(1),
}
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand All @@ -143,7 +146,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.stage_id,
children[0].clone(),
self.work_dir.clone(),
None,
self.shuffle_output_partitioning.clone(),
)?))
}

Expand Down
32 changes: 31 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use std::convert::{TryFrom, TryInto};
use std::sync::Arc;

use crate::error::BallistaError;
use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
use crate::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::protobuf::ShuffleReaderPartition;
use crate::serde::scheduler::PartitionLocation;
Expand Down Expand Up @@ -370,6 +372,34 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
partition_mode,
)?))
}
PhysicalPlanType::ShuffleWriter(shuffle_writer) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(shuffle_writer.input)?;

let output_partitioning = match &shuffle_writer.output_partitioning {
Some(hash_part) => {
let expr = hash_part
.hash_expr
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;

Some(Partitioning::Hash(
expr,
hash_part.partition_count.try_into().unwrap(),
))
}
None => None,
};

Ok(Arc::new(ShuffleWriterExec::try_new(
shuffle_writer.job_id.clone(),
shuffle_writer.stage_id as usize,
input,
"".to_string(), // this is intentional but hacky - the executor will fill this in
output_partitioning,
)?))
}
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
let schema = Arc::new(convert_required!(shuffle_reader.schema)?);
let partition_location: Vec<Vec<PartitionLocation>> = shuffle_reader
Expand Down
16 changes: 16 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod roundtrip_tests {

use super::super::super::error::Result;
use super::super::protobuf;
use crate::execution_plans::ShuffleWriterExec;

fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
let proto: protobuf::PhysicalPlanNode = exec_plan.clone().try_into()?;
Expand Down Expand Up @@ -184,4 +185,19 @@ mod roundtrip_tests {
Arc::new(EmptyExec::new(false, schema)),
)?))
}

#[test]
fn roundtrip_shuffle_writer() -> Result<()> {
let field_a = Field::new("a", DataType::Int64, false);
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));

roundtrip_test(Arc::new(ShuffleWriterExec::try_new(
"job123".to_string(),
123,
Arc::new(EmptyExec::new(false, schema)),
"".to_string(),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 4)),
)?))
}
}
33 changes: 32 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ use datafusion::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr};
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use protobuf::physical_plan_node::PhysicalPlanType;

use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
use crate::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{protobuf, BallistaError};
Expand Down Expand Up @@ -356,6 +358,35 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<ShuffleWriterExec>() {
let input: protobuf::PhysicalPlanNode =
exec.children()[0].to_owned().try_into()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ShuffleWriter(Box::new(
protobuf::ShuffleWriterExecNode {
job_id: exec.job_id().to_string(),
stage_id: exec.stage_id() as u32,
input: Some(Box::new(input)),
output_partitioning: match exec.output_partitioning() {
Partitioning::Hash(exprs, partition_count) => {
Some(protobuf::PhysicalHashRepartition {
hash_expr: exprs
.iter()
.map(|expr| expr.clone().try_into())
.collect::<Result<Vec<_>, BallistaError>>()?,
partition_count: partition_count as u64,
})
}
other => {
return Err(BallistaError::General(format!(
"physical_plan::to_proto() invalid partitioning for ShuffleWriterExec: {:?}",
other
)))
}
},
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<UnresolvedShuffleExec>() {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Unresolved(
Expand Down