Skip to content

Commit 24f32ca

Browse files
authored
Ballista: Shuffle write bug fix (#714)
* shuffle write bug fix * Rename variable * windows * fix bug in windows-specific assertion * revert accidental change
1 parent 0a05acf commit 24f32ca

File tree

1 file changed

+24
-9
lines changed

1 file changed

+24
-9
lines changed

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ impl ExecutionPlan for ShuffleWriterExec {
234234
indices[(*hash % num_output_partitions as u64) as usize]
235235
.push(index as u64)
236236
}
237-
for (num_output_partition, partition_indices) in
237+
for (output_partition, partition_indices) in
238238
indices.into_iter().enumerate()
239239
{
240240
let indices = partition_indices.into();
@@ -254,13 +254,13 @@ impl ExecutionPlan for ShuffleWriterExec {
254254

255255
// write batch out
256256
let start = Instant::now();
257-
match &mut writers[num_output_partition] {
257+
match &mut writers[output_partition] {
258258
Some(w) => {
259259
w.write(&output_batch)?;
260260
}
261261
None => {
262262
let mut path = path.clone();
263-
path.push(&format!("{}", partition));
263+
path.push(&format!("{}", output_partition));
264264
std::fs::create_dir_all(&path)?;
265265

266266
path.push("data.arrow");
@@ -271,7 +271,7 @@ impl ExecutionPlan for ShuffleWriterExec {
271271
ShuffleWriter::new(path, stream.schema().as_ref())?;
272272

273273
writer.write(&output_batch)?;
274-
writers[num_output_partition] = Some(writer);
274+
writers[output_partition] = Some(writer);
275275
}
276276
}
277277
self.metrics.write_time.add_elapsed(start);
@@ -419,20 +419,22 @@ impl ShuffleWriter {
419419
mod tests {
420420
use super::*;
421421
use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
422+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
422423
use datafusion::physical_plan::expressions::Column;
424+
use datafusion::physical_plan::limit::GlobalLimitExec;
423425
use datafusion::physical_plan::memory::MemoryExec;
424426
use tempfile::TempDir;
425427

426428
#[tokio::test]
427429
async fn test() -> Result<()> {
428-
let input_plan = create_input_plan()?;
430+
let input_plan = Arc::new(CoalescePartitionsExec::new(create_input_plan()?));
429431
let work_dir = TempDir::new()?;
430432
let query_stage = ShuffleWriterExec::try_new(
431433
"jobOne".to_owned(),
432434
1,
433435
input_plan,
434436
work_dir.into_path().to_str().unwrap().to_owned(),
435-
None,
437+
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
436438
)?;
437439
let mut stream = query_stage.execute(0).await?;
438440
let batches = utils::collect_stream(&mut stream)
@@ -441,24 +443,37 @@ mod tests {
441443
assert_eq!(1, batches.len());
442444
let batch = &batches[0];
443445
assert_eq!(3, batch.num_columns());
444-
assert_eq!(1, batch.num_rows());
446+
assert_eq!(2, batch.num_rows());
445447
let path = batch.columns()[1]
446448
.as_any()
447449
.downcast_ref::<StringArray>()
448450
.unwrap();
449-
let file = path.value(0);
450-
assert!(file.ends_with("data.arrow"));
451+
452+
let file0 = path.value(0);
453+
assert!(
454+
file0.ends_with("/jobOne/1/0/data.arrow")
455+
|| file0.ends_with("\\jobOne\\1\\0\\data.arrow")
456+
);
457+
let file1 = path.value(1);
458+
assert!(
459+
file1.ends_with("/jobOne/1/1/data.arrow")
460+
|| file1.ends_with("\\jobOne\\1\\1\\data.arrow")
461+
);
462+
451463
let stats = batch.columns()[2]
452464
.as_any()
453465
.downcast_ref::<StructArray>()
454466
.unwrap();
467+
455468
let num_rows = stats
456469
.column_by_name("num_rows")
457470
.unwrap()
458471
.as_any()
459472
.downcast_ref::<UInt64Array>()
460473
.unwrap();
461474
assert_eq!(4, num_rows.value(0));
475+
assert_eq!(4, num_rows.value(1));
476+
462477
Ok(())
463478
}
464479

0 commit comments

Comments
 (0)