Skip to content

Commit

Permalink
Revise
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 9, 2024
1 parent 4595468 commit da8d679
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 71 deletions.
1 change: 1 addition & 0 deletions .github/actions/rust-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,6 @@ runs:
- name: Run Cargo test
shell: bash
run: |
ulimit -n 102400
cd native
RUST_BACKTRACE=1 cargo test
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ crc32fast = "1.3.2"
simd-adler32 = "0.3.7"
datafusion-comet-spark-expr = { workspace = true }
datafusion-comet-proto = { workspace = true }
either = "1.13.0"

[dev-dependencies]
pprof = { version = "0.13.0", features = ["flamegraph"] }
Expand Down
212 changes: 141 additions & 71 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use datafusion::{
},
};
use datafusion_physical_expr::EquivalenceProperties;
use either::{Either, Left, Right};
use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use simd_adler32::Adler32;
Expand Down Expand Up @@ -233,10 +234,8 @@ impl PartitionBuffer {
}

/// Initializes active builders if necessary.
async fn init_active_if_necessary(
&mut self,
repartitioner: &ShuffleRepartitioner,
) -> Result<isize> {
/// Returns error if memory reservation fails.
fn init_active_if_necessary(&mut self) -> Result<isize> {
let mut mem_diff = 0;

if self.active.is_empty() {
Expand All @@ -250,15 +249,7 @@ impl PartitionBuffer {
.sum::<usize>();
}

if self
.reservation
.try_grow(self.active_slots_mem_size)
.is_err()
{
repartitioner.spill().await?;
self.reservation.free();
self.reservation.try_grow(self.active_slots_mem_size)?;
}
self.reservation.try_grow(self.active_slots_mem_size)?;

self.active = new_array_builders(&self.schema, self.batch_size);

Expand All @@ -267,32 +258,23 @@ impl PartitionBuffer {
Ok(mem_diff)
}

/// Appends all rows of given batch into active array builders.
async fn append_batch(
&mut self,
batch: &RecordBatch,
time_metric: &Time,
repartitioner: &ShuffleRepartitioner,
) -> Result<isize> {
let columns = batch.columns();
let indices = (0..batch.num_rows()).collect::<Vec<usize>>();
self.append_rows(columns, &indices, time_metric, repartitioner)
.await
}

/// Appends rows of specified indices from columns into active array builders.
async fn append_rows(
fn append_rows(
&mut self,
columns: &[ArrayRef],
indices: &[usize],
start_index: usize,
time_metric: &Time,
repartitioner: &ShuffleRepartitioner,
) -> Result<isize> {
) -> Either<Result<isize>, usize> {
let mut mem_diff = 0;
let mut start = 0;
let mut start = start_index;

// lazy init because some partition may be empty
mem_diff += self.init_active_if_necessary(repartitioner).await?;
let init = self.init_active_if_necessary();
if init.is_err() {
return Right(start);
}
mem_diff += init.unwrap();

while start < indices.len() {
let end = (start + self.batch_size).min(indices.len());
Expand All @@ -305,14 +287,22 @@ impl PartitionBuffer {
self.num_active_rows += end - start;
if self.num_active_rows >= self.batch_size {
let mut timer = time_metric.timer();
mem_diff += self.flush()?;
let flush = self.flush();
if let Err(e) = flush {
return Left(Err(e));
}
mem_diff += flush.unwrap();
timer.stop();

mem_diff += self.init_active_if_necessary(repartitioner).await?;
let init = self.init_active_if_necessary();
if init.is_err() {
return Right(end);
}
mem_diff += init.unwrap();
}
start = end;
}
Ok(mem_diff)
Left(Ok(mem_diff))
}

/// flush active data into frozen bytes
Expand All @@ -326,7 +316,7 @@ impl PartitionBuffer {
let active = std::mem::take(&mut self.active);
let num_rows = self.num_active_rows;
self.num_active_rows = 0;
mem_diff -= self.active_slots_mem_size as isize;
self.reservation.try_shrink(self.active_slots_mem_size)?;

let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?;

Expand Down Expand Up @@ -610,7 +600,7 @@ struct ShuffleRepartitioner {
output_data_file: String,
output_index_file: String,
schema: SchemaRef,
buffered_partitions: Mutex<Vec<PartitionBuffer>>,
buffered_partitions: Vec<PartitionBuffer>,
spills: Mutex<Vec<SpillInfo>>,
/// Sort expressions
/// Partitioning scheme to use
Expand Down Expand Up @@ -683,18 +673,11 @@ impl ShuffleRepartitioner {
output_data_file,
output_index_file,
schema: Arc::clone(&schema),
buffered_partitions: Mutex::new(
(0..num_output_partitions)
.map(|partition_id| {
PartitionBuffer::new(
Arc::clone(&schema),
batch_size,
partition_id,
&runtime,
)
})
.collect::<Vec<_>>(),
),
buffered_partitions: (0..num_output_partitions)
.map(|partition_id| {
PartitionBuffer::new(Arc::clone(&schema), batch_size, partition_id, &runtime)
})
.collect::<Vec<_>>(),
spills: Mutex::new(vec![]),
partitioning,
num_output_partitions,
Expand Down Expand Up @@ -741,8 +724,6 @@ impl ShuffleRepartitioner {
// Update data size metric
self.metrics.data_size.add(input.get_array_memory_size());

let time_metric = self.metrics.baseline.elapsed_compute();

// NOTE: in shuffle writer exec, the output_rows metrics represents the
// number of rows those are written to output data file.
self.metrics.baseline.record_output(input.num_rows());
Expand Down Expand Up @@ -807,17 +788,11 @@ impl ShuffleRepartitioner {
.enumerate()
.filter(|(_, (start, end))| start < end)
{
let mut buffered_partitions = self.buffered_partitions.lock().await;
let output = &mut buffered_partitions[partition_id];

// If the range of indices is not big enough, just appending the rows into
// active array builders instead of directly adding them as a record batch.
mem_diff += output
.append_rows(
mem_diff += self
.append_rows_to_partition(
input.columns(),
&shuffled_partition_ids[start..end],
time_metric,
self,
partition_id,
)
.await?;

Expand All @@ -842,16 +817,18 @@ impl ShuffleRepartitioner {
}
}
Partitioning::UnknownPartitioning(n) if *n == 1 => {
let mut buffered_partitions = self.buffered_partitions.lock().await;
let buffered_partitions = &mut self.buffered_partitions;

assert!(
buffered_partitions.len() == 1,
"Expected 1 partition but got {}",
buffered_partitions.len()
);

let output = &mut buffered_partitions[0];
output.append_batch(&input, time_metric, self).await?;
let indices = (0..input.num_rows()).collect::<Vec<usize>>();

self.append_rows_to_partition(input.columns(), &indices, 0)
.await?;
}
other => {
// this should be unreachable as long as the validation logic
Expand All @@ -868,7 +845,7 @@ impl ShuffleRepartitioner {
/// Writes buffered shuffled record batches into Arrow IPC bytes.
async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
let num_output_partitions = self.num_output_partitions;
let mut buffered_partitions = self.buffered_partitions.lock().await;
let buffered_partitions = &mut self.buffered_partitions;
let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions];

for i in 0..num_output_partitions {
Expand Down Expand Up @@ -966,16 +943,15 @@ impl ShuffleRepartitioner {
self.metrics.data_size.value()
}

async fn spill(&self) -> Result<usize> {
async fn spill(&mut self) -> Result<usize> {
log::debug!(
"ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)",
self.used(),
self.spill_count()
);

let mut buffered_partitions = self.buffered_partitions.lock().await;
// we could always get a chance to free some memory as long as we are holding some
if buffered_partitions.len() == 0 {
if self.buffered_partitions.is_empty() {
return Ok(0);
}

Expand All @@ -986,7 +962,7 @@ impl ShuffleRepartitioner {
.disk_manager
.create_tmp_file("shuffle writer spill")?;
let offsets = spill_into(
&mut buffered_partitions,
&mut self.buffered_partitions,
spillfile.path(),
self.num_output_partitions,
)
Expand All @@ -1004,6 +980,60 @@ impl ShuffleRepartitioner {
});
Ok(used)
}

/// Appends rows of specified indices from columns into active array builders in the specified partition.
async fn append_rows_to_partition(
&mut self,
columns: &[ArrayRef],
indices: &[usize],
partition_id: usize,
) -> Result<isize> {
let mut mem_diff = 0;

let output = &mut self.buffered_partitions[partition_id];

let time_metric = self.metrics.baseline.elapsed_compute();

// If the range of indices is not big enough, just appending the rows into
// active array builders instead of directly adding them as a record batch.
let mut start_index: usize = 0;
let mut output_ret = output.append_rows(columns, indices, start_index, time_metric);

loop {
match output_ret {
Left(l) => {
mem_diff += l?;
break;
}
Right(new_start) => {
// Cannot allocate enough memory for the array builders in the partition,
// spill partitions and retry.
self.spill().await?;

let output = &mut self.buffered_partitions[partition_id];
output.reservation.free();

let time_metric = self.metrics.baseline.elapsed_compute();

start_index = new_start;
output_ret = output.append_rows(columns, indices, start_index, time_metric);

if let Right(new_start) = output_ret {
if new_start == start_index {
// If the start index is not updated, it means that the partition
// is still not able to allocate enough memory for the array builders.
return Err(DataFusionError::Internal(
"Partition is still not able to allocate enough memory for the array builders after spilling."
.to_string(),
));
}
}
}
}
}

Ok(mem_diff)
}
}

/// consume the `buffered_partitions` and do spill into a single temp shuffle output file
Expand Down Expand Up @@ -1520,6 +1550,8 @@ mod test {
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_expr::expressions::Column;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -1554,25 +1586,63 @@ mod test {
#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
fn test_insert_larger_batch() {
shuffle_write_test(10000, 1, 16, None);
}

#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
fn test_insert_smaller_batch() {
shuffle_write_test(1000, 1, 16, None);
shuffle_write_test(1000, 10, 16, None);
}

#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
fn test_large_number_of_partitions() {
shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024));
shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024));
}

#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
fn test_large_number_of_partitions_spilling() {
shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024));
}

fn shuffle_write_test(
batch_size: usize,
num_batches: usize,
num_partitions: usize,
memory_limit: Option<usize>,
) {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let mut b = StringBuilder::new();
for i in 0..10000 {
for i in 0..batch_size {
b.append_value(format!("{i}"));
}
let array = b.finish();
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap();

let batches = vec![batch.clone()];
let batches = (0..num_batches).map(|_| batch.clone()).collect::<Vec<_>>();

let partitions = &[batches];
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
)
.unwrap();
let ctx = SessionContext::new();

// 10MB memory should be enough for running this test
let config = SessionConfig::new();
let mut runtime_env_builder = RuntimeEnvBuilder::new();
runtime_env_builder = match memory_limit {
Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0),
None => runtime_env_builder,
};
let runtime_env = Arc::new(runtime_env_builder.build().unwrap());
let ctx = SessionContext::new_with_config_rt(config, runtime_env);
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
let rt = Runtime::new().unwrap();
Expand Down

0 comments on commit da8d679

Please sign in to comment.