Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 17, 2024
1 parent 851427f commit 72c7a0c
Showing 1 changed file with 17 additions and 21 deletions.
38 changes: 17 additions & 21 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,8 +973,7 @@ impl ShuffleRepartitioner {
&mut self.buffered_partitions,
spillfile.path(),
self.num_output_partitions,
)
.await?;
)?;

timer.stop();

Expand Down Expand Up @@ -1045,7 +1044,7 @@ impl ShuffleRepartitioner {
}

/// consume the `buffered_partitions` and do spill into a single temp shuffle output file
async fn spill_into(
fn spill_into(
buffered_partitions: &mut [PartitionBuffer],
path: &Path,
num_output_partitions: usize,
Expand All @@ -1058,25 +1057,22 @@ async fn spill_into(
}
let path = path.to_owned();

task::spawn_blocking(move || {
let mut offsets = vec![0; num_output_partitions + 1];
let mut spill_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)?;
let mut offsets = vec![0; num_output_partitions + 1];
let mut spill_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.map_err(|e| DataFusionError::Execution(format!("Error occurred while spilling {}", e)))?;

for i in 0..num_output_partitions {
offsets[i] = spill_data.stream_position()?;
spill_data.write_all(&output_batches[i])?;
output_batches[i].clear();
}
// add one extra offset at last to ease partition length computation
offsets[num_output_partitions] = spill_data.stream_position()?;
Ok(offsets)
})
.await
.map_err(|e| DataFusionError::Execution(format!("Error occurred while spilling {}", e)))?
for i in 0..num_output_partitions {
offsets[i] = spill_data.stream_position()?;
spill_data.write_all(&output_batches[i])?;
output_batches[i].clear();
}
// add one extra offset at last to ease partition length computation
offsets[num_output_partitions] = spill_data.stream_position()?;
Ok(offsets)
}

impl Debug for ShuffleRepartitioner {
Expand Down

0 comments on commit 72c7a0c

Please sign in to comment.