Skip to content

Commit aa2719e

Browse files
committed
save
1 parent 24f32ca commit aa2719e

File tree

8 files changed

+154
-19
lines changed

8 files changed

+154
-19
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ message PhysicalPlanNode {
424424
UnresolvedShuffleExecNode unresolved = 15;
425425
RepartitionExecNode repartition = 16;
426426
WindowAggExecNode window = 17;
427+
ShuffleWriterExecNode shuffle_writer = 18;
427428
}
428429
}
429430

@@ -629,6 +630,15 @@ message HashAggregateExecNode {
629630
Schema input_schema = 7;
630631
}
631632

633+
message ShuffleWriterExecNode {
634+
//TODO it seems redundant to provide job and stage id here since we also have them
635+
// in the TaskDefinition that wraps this plan
636+
string job_id = 1;
637+
uint32 stage_id = 2;
638+
PhysicalPlanNode input = 3;
639+
PhysicalHashRepartition output_partitioning = 4;
640+
}
641+
632642
message ShuffleReaderExecNode {
633643
repeated ShuffleReaderPartition partition = 1;
634644
Schema schema = 2;
@@ -793,6 +803,9 @@ message PollWorkParams {
793803
message TaskDefinition {
794804
PartitionId task_id = 1;
795805
PhysicalPlanNode plan = 2;
806+
// TODO tasks are currently always shuffle writes but this will not always be the case
807+
// so we might want to think about some refactoring of the task definition
808+
PhysicalHashRepartition shuffle_output_partitioning = 3;
796809
}
797810

798811
message PollWorkResult {

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,10 @@ impl ExecutionPlan for ShuffleWriterExec {
126126
}
127127

128128
fn output_partitioning(&self) -> Partitioning {
129-
self.plan.output_partitioning()
129+
match &self.shuffle_output_partitioning {
130+
Some(p) => p.clone(),
131+
_ => Partitioning::UnknownPartitioning(1),
132+
}
130133
}
131134

132135
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -143,7 +146,7 @@ impl ExecutionPlan for ShuffleWriterExec {
143146
self.stage_id,
144147
children[0].clone(),
145148
self.work_dir.clone(),
146-
None,
149+
self.shuffle_output_partitioning.clone(),
147150
)?))
148151
}
149152

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use std::convert::{TryFrom, TryInto};
2222
use std::sync::Arc;
2323

2424
use crate::error::BallistaError;
25-
use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
25+
use crate::execution_plans::{
26+
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
27+
};
2628
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
2729
use crate::serde::protobuf::ShuffleReaderPartition;
2830
use crate::serde::scheduler::PartitionLocation;
@@ -370,6 +372,34 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
370372
partition_mode,
371373
)?))
372374
}
375+
PhysicalPlanType::ShuffleWriter(shuffle_writer) => {
376+
let input: Arc<dyn ExecutionPlan> =
377+
convert_box_required!(shuffle_writer.input)?;
378+
379+
let output_partitioning = match &shuffle_writer.output_partitioning {
380+
Some(hash_part) => {
381+
let expr = hash_part
382+
.hash_expr
383+
.iter()
384+
.map(|e| e.try_into())
385+
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
386+
387+
Some(Partitioning::Hash(
388+
expr,
389+
hash_part.partition_count.try_into().unwrap(),
390+
))
391+
}
392+
None => None,
393+
};
394+
395+
Ok(Arc::new(ShuffleWriterExec::try_new(
396+
shuffle_writer.job_id.clone(),
397+
shuffle_writer.stage_id as usize,
398+
input,
399+
"".to_string(), // this is intentional but hacky - the executor will fill this in
400+
output_partitioning,
401+
)?))
402+
}
373403
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
374404
let schema = Arc::new(convert_required!(shuffle_reader.schema)?);
375405
let partition_location: Vec<Vec<PartitionLocation>> = shuffle_reader

ballista/rust/core/src/serde/physical_plan/to_proto.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ use datafusion::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr};
5555
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
5656
use protobuf::physical_plan_node::PhysicalPlanType;
5757

58-
use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
58+
use crate::execution_plans::{
59+
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
60+
};
5961
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
6062
use crate::serde::scheduler::PartitionLocation;
6163
use crate::serde::{protobuf, BallistaError};
@@ -356,6 +358,36 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
356358
},
357359
))),
358360
})
361+
} else if let Some(exec) = plan.downcast_ref::<ShuffleWriterExec>() {
362+
let input: protobuf::PhysicalPlanNode =
363+
exec.children()[0].to_owned().try_into()?;
364+
Ok(protobuf::PhysicalPlanNode {
365+
physical_plan_type: Some(PhysicalPlanType::ShuffleWriter(Box::new(
366+
protobuf::ShuffleWriterExecNode {
367+
job_id: exec.job_id().to_string(),
368+
stage_id: exec.stage_id() as u32,
369+
input: Some(Box::new(input)),
370+
output_partitioning: match exec.output_partitioning() {
371+
Partitioning::Hash(exprs, partition_count) => {
372+
Some(protobuf::PhysicalHashRepartition {
373+
hash_expr: exprs
374+
.iter()
375+
.map(|expr| expr.clone().try_into())
376+
.collect::<Result<Vec<_>, BallistaError>>()?,
377+
partition_count: partition_count as u64,
378+
})
379+
}
380+
Partitioning::UnknownPartitioning(1) => None,
381+
other => {
382+
return Err(BallistaError::General(format!(
383+
"physical_plan::to_proto() invalid partitioning for ShuffleWriterExec: {:?}",
384+
other
385+
)))
386+
}
387+
},
388+
},
389+
))),
390+
})
359391
} else if let Some(exec) = plan.downcast_ref::<UnresolvedShuffleExec>() {
360392
Ok(protobuf::PhysicalPlanNode {
361393
physical_plan_type: Some(PhysicalPlanType::Unresolved(

ballista/rust/executor/src/execution_loop.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
2020
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
2121
use std::{sync::Arc, time::Duration};
2222

23-
use datafusion::physical_plan::ExecutionPlan;
23+
use datafusion::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
2424
use log::{debug, error, info, warn};
2525
use tonic::transport::Channel;
2626

@@ -108,13 +108,27 @@ async fn run_received_tasks(
108108
available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
109109
let plan: Arc<dyn ExecutionPlan> = (&task.plan.unwrap()).try_into().unwrap();
110110

111+
let shuffle_output_partitioning = match task.shuffle_output_partitioning {
112+
Some(hash_part) => {
113+
let expr = hash_part
114+
.hash_expr
115+
.iter()
116+
.map(|e| e.try_into())
117+
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>().unwrap(); //TODO err handling
118+
119+
Some(Partitioning::Hash(expr, hash_part.partition_count as usize))
120+
},
121+
_ => None
122+
};
123+
111124
tokio::spawn(async move {
112125
let execution_result = executor
113-
.execute_partition(
126+
.execute_shuffle_write(
114127
task_id.job_id.clone(),
115128
task_id.stage_id as usize,
116129
task_id.partition_id as usize,
117130
plan,
131+
shuffle_output_partitioning,
118132
)
119133
.await;
120134
info!("Done with task {}", task_id_log);

ballista/rust/executor/src/executor.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use ballista_core::execution_plans::ShuffleWriterExec;
2424
use ballista_core::utils;
2525
use datafusion::arrow::record_batch::RecordBatch;
2626
use datafusion::physical_plan::display::DisplayableExecutionPlan;
27-
use datafusion::physical_plan::ExecutionPlan;
27+
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
2828

2929
/// Ballista executor
3030
pub struct Executor {
@@ -45,19 +45,20 @@ impl Executor {
4545
/// Execute one partition of a query stage and persist the result to disk in IPC format. On
4646
/// success, return a RecordBatch containing metadata about the results, including path
4747
/// and statistics.
48-
pub async fn execute_partition(
48+
pub async fn execute_shuffle_write(
4949
&self,
5050
job_id: String,
5151
stage_id: usize,
5252
part: usize,
5353
plan: Arc<dyn ExecutionPlan>,
54+
shuffle_output_partitioning: Option<Partitioning>,
5455
) -> Result<RecordBatch, BallistaError> {
5556
let exec = ShuffleWriterExec::try_new(
5657
job_id,
5758
stage_id,
5859
plan,
5960
self.work_dir.clone(),
60-
None,
61+
shuffle_output_partitioning,
6162
)?;
6263
let mut stream = exec.execute(part).await?;
6364
let batches = utils::collect_stream(&mut stream).await?;

ballista/rust/executor/src/flight_service.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,12 @@ impl FlightService for BallistaFlightService {
107107
let executor = self.executor.clone();
108108
tasks.push(tokio::spawn(async move {
109109
let results = executor
110-
.execute_partition(
110+
.execute_shuffle_write(
111111
partition.job_id.clone(),
112112
partition.stage_id,
113113
part,
114114
partition.plan.clone(),
115+
None //TODO
115116
)
116117
.await?;
117118
let results = vec![results];

ballista/rust/scheduler/src/lib.rs

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use ballista_core::serde::protobuf::{
4848
use ballista_core::serde::scheduler::ExecutorMeta;
4949

5050
use clap::arg_enum;
51-
use datafusion::physical_plan::ExecutionPlan;
51+
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
5252
#[cfg(feature = "sled")]
5353
extern crate sled_package as sled;
5454

@@ -80,9 +80,12 @@ use tonic::{Request, Response};
8080

8181
use self::state::{ConfigBackendClient, SchedulerState};
8282
use ballista_core::config::BallistaConfig;
83+
use ballista_core::serde::protobuf;
8384
use ballista_core::utils::create_datafusion_context;
8485
use datafusion::physical_plan::parquet::ParquetExec;
8586
use std::time::{Instant, SystemTime, UNIX_EPOCH};
87+
use ballista_core::execution_plans::ShuffleWriterExec;
88+
use datafusion::physical_plan::expressions::Column;
8689

8790
#[derive(Clone)]
8891
pub struct SchedulerServer {
@@ -229,9 +232,47 @@ impl SchedulerGrpc for SchedulerServer {
229232
partition_id.partition_id
230233
);
231234
}
232-
plan.map(|(status, plan)| TaskDefinition {
233-
plan: Some(plan.try_into().unwrap()),
234-
task_id: status.partition_id,
235+
plan.map(|(status, plan)| {
236+
if let Some(shuffle_writer) = plan.as_any().downcast_ref::<ShuffleWriterExec>() {
237+
let shuffle_child = plan.children()[0].clone();
238+
match shuffle_writer.output_partitioning() {
239+
Partitioning::Hash(expr, n) => {
240+
let shuffle_output_partitioning = Some(protobuf::PhysicalHashRepartition {
241+
hash_expr: expr.iter().map(|e| {
242+
if let Some(col) = e.as_any().downcast_ref::<Column>() {
243+
protobuf::PhysicalExprNode {
244+
expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
245+
//TODO should implement Into for this type?
246+
protobuf::PhysicalColumn {
247+
name: col.name().to_string(),
248+
index: col.index() as u32,
249+
}))
250+
}
251+
} else {
252+
todo!()
253+
}
254+
}).collect::<Vec<_>>(),
255+
partition_count: n as u64
256+
});
257+
258+
TaskDefinition {
259+
plan: Some(shuffle_child.try_into().unwrap()),
260+
task_id: status.partition_id,
261+
shuffle_output_partitioning,
262+
}
263+
}
264+
Partitioning::UnknownPartitioning(1) => {
265+
TaskDefinition {
266+
plan: Some(shuffle_child.try_into().unwrap()),
267+
task_id: status.partition_id,
268+
shuffle_output_partitioning: None,
269+
}
270+
}
271+
_ => todo!()
272+
}
273+
} else {
274+
unreachable!()
275+
}
235276
})
236277
} else {
237278
None
@@ -431,25 +472,25 @@ impl SchedulerGrpc for SchedulerServer {
431472
}));
432473

433474
// save stages into state
434-
for stage in stages {
475+
for shuffle_writer in stages {
435476
fail_job!(state
436477
.save_stage_plan(
437478
&job_id_spawn,
438-
stage.stage_id(),
439-
stage.children()[0].clone()
479+
shuffle_writer.stage_id(),
480+
shuffle_writer.clone()
440481
)
441482
.await
442483
.map_err(|e| {
443484
let msg = format!("Could not save stage plan: {}", e);
444485
error!("{}", msg);
445486
tonic::Status::internal(msg)
446487
}));
447-
let num_partitions = stage.output_partitioning().partition_count();
488+
let num_partitions = shuffle_writer.output_partitioning().partition_count();
448489
for partition_id in 0..num_partitions {
449490
let pending_status = TaskStatus {
450491
partition_id: Some(PartitionId {
451492
job_id: job_id_spawn.clone(),
452-
stage_id: stage.stage_id() as u32,
493+
stage_id: shuffle_writer.stage_id() as u32,
453494
partition_id: partition_id as u32,
454495
}),
455496
status: None,

0 commit comments

Comments
 (0)