Skip to content

Commit 2bc0415

Browse files
committed
bug fix
1 parent 7a40008 commit 2bc0415

File tree

3 files changed

+50
-16
lines changed

3 files changed

+50
-16
lines changed

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,6 @@ impl ExecutionPlan for ShuffleWriterExec {
255255
let output_batch =
256256
RecordBatch::try_new(input_batch.schema(), columns)?;
257257

258-
//TODO avoid writing empty batches!
259-
260258
// write batch out
261259
let start = Instant::now();
262260
match &mut writers[output_partition_num] {
@@ -283,6 +281,25 @@ impl ExecutionPlan for ShuffleWriterExec {
283281
}
284282
}
285283

284+
//TODO big hack to write empty partitions so that shuffle reader doesn't blow up
285+
// when attempting to read empty partitions
286+
// see https://github.com/apache/arrow-datafusion/issues/711 for more info
287+
for output_partition_num in 0..writers.len() {
288+
if writers[output_partition_num].is_none() {
289+
let mut path = path.clone();
290+
path.push(&format!("{}", output_partition_num));
291+
std::fs::create_dir_all(&path)?;
292+
293+
path.push("data.arrow");
294+
let path = path.to_str().unwrap();
295+
info!("Writing empty results to {}", path);
296+
297+
let writer = ShuffleWriter::new(path, stream.schema().as_ref())?;
298+
299+
writers[output_partition_num] = Some(writer);
300+
}
301+
}
302+
286303
// build metadata result batch
287304
let num_writers = writers.iter().filter(|w| w.is_some()).count();
288305
let mut partition_builder = UInt32Builder::new(num_writers);

ballista/rust/executor/src/execution_loop.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use ballista_core::serde::protobuf::{
3232
use protobuf::CompletedTask;
3333

3434
use crate::executor::Executor;
35+
use ballista_core::execution_plans::ShuffleWriterExec;
3536

3637
pub async fn poll_loop(
3738
mut scheduler: SchedulerGrpcClient<Channel>,
@@ -108,19 +109,35 @@ async fn run_received_tasks(
108109
available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
109110
let plan: Arc<dyn ExecutionPlan> = (&task.plan.unwrap()).try_into().unwrap();
110111

111-
tokio::spawn(async move {
112-
let execution_result = executor
113-
.execute_shuffle_write(plan, task_id.partition_id as usize)
114-
.await;
115-
info!("Done with task {}", task_id_log);
116-
debug!("Statistics: {:?}", execution_result);
117-
available_tasks_slots.fetch_add(1, Ordering::SeqCst);
118-
let _ = task_status_sender.send(as_task_status(
119-
execution_result.map(|_| ()),
120-
executor_id,
121-
task_id,
122-
));
123-
});
112+
if let Some(shuffle_writer) = plan.as_any().downcast_ref::<ShuffleWriterExec>() {
113+
//TODO this is a hacky way to set the working directory for the shuffle writer
114+
let exec: Arc<dyn ExecutionPlan> = Arc::new(
115+
ShuffleWriterExec::try_new(
116+
shuffle_writer.job_id().to_string(),
117+
shuffle_writer.stage_id(),
118+
shuffle_writer.children()[0].clone(),
119+
executor.work_dir.clone(),
120+
Some(shuffle_writer.output_partitioning()),
121+
)
122+
.unwrap(),
123+
); //TODO unwrap
124+
tokio::spawn(async move {
125+
let execution_result = executor
126+
.execute_shuffle_write(exec, task_id.partition_id as usize)
127+
.await;
128+
info!("Done with task {}", task_id_log);
129+
debug!("Statistics: {:?}", execution_result);
130+
available_tasks_slots.fetch_add(1, Ordering::SeqCst);
131+
let _ = task_status_sender.send(as_task_status(
132+
execution_result.map(|_| ()),
133+
executor_id,
134+
task_id,
135+
));
136+
});
137+
} else {
138+
//TODO error handling
139+
unreachable!()
140+
}
124141
}
125142

126143
fn as_task_status(

ballista/rust/executor/src/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion::physical_plan::ExecutionPlan;
2828
/// Ballista executor
2929
pub struct Executor {
3030
/// Directory for storing partial results
31-
work_dir: String,
31+
pub work_dir: String,
3232
}
3333

3434
impl Executor {

0 commit comments

Comments
 (0)