Skip to content

Commit 7a40008

Browse files
committed
revert partition stats, fix bugs in shuffle writer
1 parent 1fa8ff5 commit 7a40008

File tree

4 files changed

+67
-71
lines changed

4 files changed

+67
-71
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,6 @@ message FailedTask {
782782

783783
message CompletedTask {
784784
string executor_id = 1;
785-
PartitionStats partition_stats = 2;
786785
}
787786

788787
message TaskStatus {

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ impl ExecutionPlan for ShuffleWriterExec {
237237
indices[(*hash % num_output_partitions as u64) as usize]
238238
.push(index as u64)
239239
}
240-
for (num_output_partition, partition_indices) in
240+
for (output_partition_num, partition_indices) in
241241
indices.into_iter().enumerate()
242242
{
243243
let indices = partition_indices.into();
@@ -255,15 +255,17 @@ impl ExecutionPlan for ShuffleWriterExec {
255255
let output_batch =
256256
RecordBatch::try_new(input_batch.schema(), columns)?;
257257

258+
//TODO avoid writing empty batches!
259+
258260
// write batch out
259261
let start = Instant::now();
260-
match &mut writers[num_output_partition] {
262+
match &mut writers[output_partition_num] {
261263
Some(w) => {
262264
w.write(&output_batch)?;
263265
}
264266
None => {
265267
let mut path = path.clone();
266-
path.push(&format!("{}", partition));
268+
path.push(&format!("{}", output_partition_num));
267269
std::fs::create_dir_all(&path)?;
268270

269271
path.push("data.arrow");
@@ -274,7 +276,7 @@ impl ExecutionPlan for ShuffleWriterExec {
274276
ShuffleWriter::new(path, stream.schema().as_ref())?;
275277

276278
writer.write(&output_batch)?;
277-
writers[num_output_partition] = Some(writer);
279+
writers[output_partition_num] = Some(writer);
278280
}
279281
}
280282
self.metrics.write_time.add_elapsed(start);
@@ -422,20 +424,27 @@ impl ShuffleWriter {
422424
mod tests {
423425
use super::*;
424426
use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
427+
use datafusion::arrow::ipc::reader::FileReader;
428+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
425429
use datafusion::physical_plan::expressions::Column;
430+
use datafusion::physical_plan::limit::GlobalLimitExec;
426431
use datafusion::physical_plan::memory::MemoryExec;
432+
use std::path::Path;
427433
use tempfile::TempDir;
428434

429435
#[tokio::test]
430436
async fn test() -> Result<()> {
431-
let input_plan = create_input_plan()?;
437+
let input_plan = Arc::new(GlobalLimitExec::new(
438+
Arc::new(CoalescePartitionsExec::new(create_input_plan()?)),
439+
1,
440+
));
432441
let work_dir = TempDir::new()?;
433442
let query_stage = ShuffleWriterExec::try_new(
434443
"jobOne".to_owned(),
435444
1,
436445
input_plan,
437446
work_dir.into_path().to_str().unwrap().to_owned(),
438-
None,
447+
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
439448
)?;
440449
let mut stream = query_stage.execute(0).await?;
441450
let batches = utils::collect_stream(&mut stream)
@@ -444,24 +453,42 @@ mod tests {
444453
assert_eq!(1, batches.len());
445454
let batch = &batches[0];
446455
assert_eq!(3, batch.num_columns());
447-
assert_eq!(1, batch.num_rows());
456+
assert_eq!(2, batch.num_rows());
448457
let path = batch.columns()[1]
449458
.as_any()
450459
.downcast_ref::<StringArray>()
451460
.unwrap();
452-
let file = path.value(0);
453-
assert!(file.ends_with("data.arrow"));
461+
462+
let file0 = path.value(0);
463+
assert!(file0.ends_with("/jobOne/1/0/data.arrow"));
464+
let file = File::open(file0).unwrap();
465+
let mut reader = FileReader::try_new(file).unwrap();
466+
let batch0 = reader.next().unwrap();
467+
assert!(batch0.is_ok());
468+
assert_eq!(0, batch0.unwrap().num_rows());
469+
470+
let file1 = path.value(1);
471+
assert!(file1.ends_with("/jobOne/1/1/data.arrow"));
472+
let file = File::open(file1).unwrap();
473+
let mut reader = FileReader::try_new(file).unwrap();
474+
let batch1 = reader.next().unwrap();
475+
assert!(batch1.is_ok());
476+
assert_eq!(1, batch1.unwrap().num_rows());
477+
454478
let stats = batch.columns()[2]
455479
.as_any()
456480
.downcast_ref::<StructArray>()
457481
.unwrap();
482+
458483
let num_rows = stats
459484
.column_by_name("num_rows")
460485
.unwrap()
461486
.as_any()
462487
.downcast_ref::<UInt64Array>()
463488
.unwrap();
464-
assert_eq!(4, num_rows.value(0));
489+
assert_eq!(0, num_rows.value(0));
490+
assert_eq!(1, num_rows.value(1));
491+
465492
Ok(())
466493
}
467494

ballista/rust/executor/src/execution_loop.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use ballista_core::serde::protobuf::{
3232
use protobuf::CompletedTask;
3333

3434
use crate::executor::Executor;
35-
use ballista_core::serde::scheduler::PartitionStats;
3635

3736
pub async fn poll_loop(
3837
mut scheduler: SchedulerGrpcClient<Channel>,
@@ -115,16 +114,11 @@ async fn run_received_tasks(
115114
.await;
116115
info!("Done with task {}", task_id_log);
117116
debug!("Statistics: {:?}", execution_result);
118-
119-
let partition_stats = PartitionStats::default();
120-
//TODO populate this
121-
122117
available_tasks_slots.fetch_add(1, Ordering::SeqCst);
123118
let _ = task_status_sender.send(as_task_status(
124119
execution_result.map(|_| ()),
125120
executor_id,
126121
task_id,
127-
partition_stats,
128122
));
129123
});
130124
}
@@ -133,7 +127,6 @@ fn as_task_status(
133127
execution_result: ballista_core::error::Result<()>,
134128
executor_id: String,
135129
task_id: PartitionId,
136-
partition_stats: PartitionStats,
137130
) -> TaskStatus {
138131
match execution_result {
139132
Ok(_) => {
@@ -143,7 +136,6 @@ fn as_task_status(
143136
partition_id: Some(task_id),
144137
status: Some(task_status::Status::Completed(CompletedTask {
145138
executor_id,
146-
partition_stats: Some(partition_stats.into()),
147139
})),
148140
}
149141
}

ballista/rust/scheduler/src/state/mod.rs

Lines changed: 30 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use prost::Message;
2727
use tokio::sync::OwnedMutexGuard;
2828

2929
use ballista_core::serde::protobuf::{
30-
self, job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat,
30+
job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat,
3131
ExecutorMetadata, FailedJob, FailedTask, JobStatus, PhysicalPlanNode, RunningJob,
3232
RunningTask, TaskStatus,
3333
};
@@ -254,9 +254,9 @@ impl SchedulerState {
254254
executors: &[ExecutorMeta],
255255
) -> Result<bool> {
256256
let executor_id: &str = match &task_status.status {
257-
Some(task_status::Status::Completed(CompletedTask {
258-
executor_id, ..
259-
})) => executor_id,
257+
Some(task_status::Status::Completed(CompletedTask { executor_id })) => {
258+
executor_id
259+
}
260260
Some(task_status::Status::Running(RunningTask { executor_id })) => {
261261
executor_id
262262
}
@@ -318,36 +318,29 @@ impl SchedulerState {
318318
if task_is_dead {
319319
continue 'tasks;
320320
} else if let Some(task_status::Status::Completed(
321-
CompletedTask {
322-
executor_id,
323-
partition_stats,
324-
},
321+
CompletedTask { executor_id },
325322
)) = &referenced_task.status
326323
{
327-
// skip reading empty partitions
328-
if partition_stats.as_ref().unwrap().num_rows > 0 {
329-
let empty = vec![];
330-
let locations = partition_locations
331-
.entry(stage_id)
332-
.or_insert(empty);
333-
let executor_meta = executors
334-
.iter()
335-
.find(|exec| exec.id == *executor_id)
336-
.unwrap()
337-
.clone();
338-
locations.push(vec![
339-
ballista_core::serde::scheduler::PartitionLocation {
340-
partition_id:
324+
let empty = vec![];
325+
let locations =
326+
partition_locations.entry(stage_id).or_insert(empty);
327+
let executor_meta = executors
328+
.iter()
329+
.find(|exec| exec.id == *executor_id)
330+
.unwrap()
331+
.clone();
332+
locations.push(vec![
333+
ballista_core::serde::scheduler::PartitionLocation {
334+
partition_id:
341335
ballista_core::serde::scheduler::PartitionId {
342336
job_id: partition.job_id.clone(),
343337
stage_id,
344338
partition_id,
345339
},
346-
executor_meta,
347-
partition_stats: PartitionStats::default(),
348-
},
349-
]);
350-
}
340+
executor_meta,
341+
partition_stats: PartitionStats::default(),
342+
},
343+
]);
351344
} else {
352345
continue 'tasks;
353346
}
@@ -460,31 +453,23 @@ impl SchedulerState {
460453
let mut job_status = statuses
461454
.iter()
462455
.map(|status| match &status.status {
463-
Some(task_status::Status::Completed(CompletedTask {
464-
executor_id,
465-
partition_stats,
466-
})) => Ok((status, executor_id, partition_stats)),
456+
Some(task_status::Status::Completed(CompletedTask { executor_id })) => {
457+
Ok((status, executor_id))
458+
}
467459
_ => Err(BallistaError::General("Task not completed".to_string())),
468460
})
469461
.collect::<Result<Vec<_>>>()
470462
.ok()
471463
.map(|info| {
472464
let partition_location = info
473465
.into_iter()
474-
.map(
475-
|(status, execution_id, partition_stats)| PartitionLocation {
476-
partition_id: status.partition_id.to_owned(),
477-
executor_meta: executors
478-
.get(execution_id)
479-
.map(|e| e.clone().into()),
480-
partition_stats: Some(protobuf::PartitionStats {
481-
num_rows: partition_stats.as_ref().unwrap().num_rows, //TODO
482-
num_batches: 0,
483-
num_bytes: 0,
484-
column_stats: vec![],
485-
}),
486-
},
487-
)
466+
.map(|(status, execution_id)| PartitionLocation {
467+
partition_id: status.partition_id.to_owned(),
468+
executor_meta: executors
469+
.get(execution_id)
470+
.map(|e| e.clone().into()),
471+
partition_stats: None,
472+
})
488473
.collect();
489474
job_status::Status::Completed(CompletedJob { partition_location })
490475
});
@@ -761,7 +746,6 @@ mod test {
761746
let meta = TaskStatus {
762747
status: Some(task_status::Status::Completed(CompletedTask {
763748
executor_id: "".to_owned(),
764-
partition_stats: None,
765749
})),
766750
partition_id: Some(PartitionId {
767751
job_id: job_id.to_owned(),
@@ -801,7 +785,6 @@ mod test {
801785
let meta = TaskStatus {
802786
status: Some(task_status::Status::Completed(CompletedTask {
803787
executor_id: "".to_owned(),
804-
partition_stats: None,
805788
})),
806789
partition_id: Some(PartitionId {
807790
job_id: job_id.to_owned(),
@@ -839,7 +822,6 @@ mod test {
839822
let meta = TaskStatus {
840823
status: Some(task_status::Status::Completed(CompletedTask {
841824
executor_id: "".to_owned(),
842-
partition_stats: None,
843825
})),
844826
partition_id: Some(PartitionId {
845827
job_id: job_id.to_owned(),
@@ -851,7 +833,6 @@ mod test {
851833
let meta = TaskStatus {
852834
status: Some(task_status::Status::Completed(CompletedTask {
853835
executor_id: "".to_owned(),
854-
partition_stats: None,
855836
})),
856837
partition_id: Some(PartitionId {
857838
job_id: job_id.to_owned(),
@@ -883,7 +864,6 @@ mod test {
883864
let meta = TaskStatus {
884865
status: Some(task_status::Status::Completed(CompletedTask {
885866
executor_id: "".to_owned(),
886-
partition_stats: None,
887867
})),
888868
partition_id: Some(PartitionId {
889869
job_id: job_id.to_owned(),
@@ -895,7 +875,6 @@ mod test {
895875
let meta = TaskStatus {
896876
status: Some(task_status::Status::Completed(CompletedTask {
897877
executor_id: "".to_owned(),
898-
partition_stats: None,
899878
})),
900879
partition_id: Some(PartitionId {
901880
job_id: job_id.to_owned(),
@@ -927,7 +906,6 @@ mod test {
927906
let meta = TaskStatus {
928907
status: Some(task_status::Status::Completed(CompletedTask {
929908
executor_id: "".to_owned(),
930-
partition_stats: None,
931909
})),
932910
partition_id: Some(PartitionId {
933911
job_id: job_id.to_owned(),

0 commit comments

Comments
 (0)