Skip to content

Commit

Permalink
For review
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 12, 2024
1 parent e121814 commit cc9e531
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
1 change: 0 additions & 1 deletion native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ crc32fast = "1.3.2"
simd-adler32 = "0.3.7"
datafusion-comet-spark-expr = { workspace = true }
datafusion-comet-proto = { workspace = true }
either = "1.13.0"

[dev-dependencies]
pprof = { version = "0.13.0", features = ["flamegraph"] }
Expand Down
25 changes: 16 additions & 9 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use datafusion::{
},
};
use datafusion_physical_expr::EquivalenceProperties;
use either::{Either, Left, Right};
use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use simd_adler32::Adler32;
Expand All @@ -67,6 +66,14 @@ use crate::{
};
use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes;

/// The status of appending rows to a partition buffer.
enum AppendRowStatus {
/// The difference in memory usage after appending rows
MemDiff(Result<isize>),
/// The index of the next row to append
StartIndex(usize)
}

/// The shuffle writer operator maps each input partition to M output partitions based on a
/// partitioning scheme. No guarantees are made about the order of the resulting partitions.
#[derive(Debug)]
Expand Down Expand Up @@ -265,14 +272,14 @@ impl PartitionBuffer {
indices: &[usize],
start_index: usize,
time_metric: &Time,
) -> Either<Result<isize>, usize> {
) -> AppendRowStatus {
let mut mem_diff = 0;
let mut start = start_index;

// lazy init because some partition may be empty
let init = self.init_active_if_necessary();
if init.is_err() {
return Right(start);
return AppendRowStatus::StartIndex(start);
}
mem_diff += init.unwrap();

Expand All @@ -289,20 +296,20 @@ impl PartitionBuffer {
let mut timer = time_metric.timer();
let flush = self.flush();
if let Err(e) = flush {
return Left(Err(e));
return AppendRowStatus::MemDiff(Err(e));
}
mem_diff += flush.unwrap();
timer.stop();

let init = self.init_active_if_necessary();
if init.is_err() {
return Right(end);
return AppendRowStatus::StartIndex(end);
}
mem_diff += init.unwrap();
}
start = end;
}
Left(Ok(mem_diff))
AppendRowStatus::MemDiff(Ok(mem_diff))
}

/// flush active data into frozen bytes
Expand Down Expand Up @@ -1001,11 +1008,11 @@ impl ShuffleRepartitioner {

loop {
match output_ret {
Left(l) => {
AppendRowStatus::MemDiff(l) => {
mem_diff += l?;
break;
}
Right(new_start) => {
AppendRowStatus::StartIndex(new_start) => {
// Cannot allocate enough memory for the array builders in the partition,
// spill partitions and retry.
self.spill().await?;
Expand All @@ -1018,7 +1025,7 @@ impl ShuffleRepartitioner {
start_index = new_start;
output_ret = output.append_rows(columns, indices, start_index, time_metric);

if let Right(new_start) = output_ret {
if let AppendRowStatus::StartIndex(new_start) = output_ret {
if new_start == start_index {
// If the start index is not updated, it means that the partition
// is still not able to allocate enough memory for the array builders.
Expand Down

0 comments on commit cc9e531

Please sign in to comment.