Skip to content

Commit

Permalink
Grow or shrink the reservation based on the mean size of the batches …
Browse files Browse the repository at this point in the history
…and the actual number of staging rows.
  • Loading branch information
Kontinuation committed Sep 26, 2024
1 parent f31f6cc commit 7cebabb
Showing 1 changed file with 101 additions and 122 deletions.
223 changes: 101 additions & 122 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ use itertools::Itertools;
use simd_adler32::Adler32;
use tokio::task;

use crate::{
common::bit::ceil,
errors::{CometError, CometResult},
};
use crate::errors::{CometError, CometResult};
use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes;

/// The shuffle writer operator maps each input partition to M output partitions based on a
Expand Down Expand Up @@ -199,8 +196,6 @@ struct PartitionBuffer {
frozen: Vec<u8>,
/// Array builders for appending rows into buffering batches.
active: Vec<Box<dyn ArrayBuilder>>,
/// The estimation of memory size of active builders in bytes when they are filled.
active_slots_mem_size: usize,
/// Number of rows in active builders.
num_active_rows: usize,
/// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`,
Expand All @@ -214,33 +209,20 @@ impl PartitionBuffer {
schema,
frozen: vec![],
active: vec![],
active_slots_mem_size: 0,
num_active_rows: 0,
batch_size,
}
}

/// Initializes active builders if necessary.
fn init_active_if_necessary(&mut self) -> Result<isize> {
let mut mem_diff = 0;

fn init_active_if_necessary(&mut self) {
if self.active.is_empty() {
self.active = new_array_builders(&self.schema, self.batch_size);
if self.active_slots_mem_size == 0 {
self.active_slots_mem_size = self
.active
.iter()
.zip(self.schema.fields())
.map(|(_ab, field)| slot_size(self.batch_size, field.data_type()))
.sum::<usize>();
}
mem_diff += self.active_slots_mem_size as isize;
self.active = new_array_builders(&self.schema, 0);
}
Ok(mem_diff)
}

/// Appends all rows of given batch into active array builders.
fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result<isize> {
fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result<()> {
let columns = batch.columns();
let indices = (0..batch.num_rows()).collect::<Vec<usize>>();
self.append_rows(columns, &indices, time_metric)
Expand All @@ -252,12 +234,11 @@ impl PartitionBuffer {
columns: &[ArrayRef],
indices: &[usize],
time_metric: &Time,
) -> Result<isize> {
let mut mem_diff = 0;
) -> Result<()> {
let mut start = 0;

// lazy init because some partition may be empty
mem_diff += self.init_active_if_necessary()?;
self.init_active_if_necessary();

while start < indices.len() {
let end = (start + self.batch_size).min(indices.len());
Expand All @@ -270,77 +251,49 @@ impl PartitionBuffer {
self.num_active_rows += end - start;
if self.num_active_rows >= self.batch_size {
let mut timer = time_metric.timer();
mem_diff += self.flush()?;
self.flush()?;
timer.stop();

mem_diff += self.init_active_if_necessary()?;
self.init_active_if_necessary();
}
start = end;
}
Ok(mem_diff)
Ok(())
}

/// flush active data into frozen bytes
fn flush(&mut self) -> Result<isize> {
fn flush(&mut self) -> Result<()> {
if self.num_active_rows == 0 {
return Ok(0);
return Ok(());
}
let mut mem_diff = 0isize;

// active -> staging
let active = std::mem::take(&mut self.active);
let num_rows = self.num_active_rows;
self.num_active_rows = 0;
mem_diff -= self.active_slots_mem_size as isize;

let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?;

let frozen_capacity_old = self.frozen.capacity();
let mut cursor = Cursor::new(&mut self.frozen);
cursor.seek(SeekFrom::End(0))?;
write_ipc_compressed(&frozen_batch, &mut cursor)?;

mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
Ok(mem_diff)
Ok(())
}
}

fn slot_size(len: usize, data_type: &DataType) -> usize {
match data_type {
DataType::Boolean => ceil(len, 8),
DataType::Int8 => len,
DataType::Int16 => len * 2,
DataType::Int32 => len * 4,
DataType::Int64 => len * 8,
DataType::UInt8 => len,
DataType::UInt16 => len * 2,
DataType::UInt32 => len * 4,
DataType::UInt64 => len * 8,
DataType::Float32 => len * 4,
DataType::Float64 => len * 8,
DataType::Date32 => len * 4,
DataType::Date64 => len * 8,
DataType::Time32(TimeUnit::Second) => len * 4,
DataType::Time32(TimeUnit::Millisecond) => len * 4,
DataType::Time64(TimeUnit::Microsecond) => len * 8,
DataType::Time64(TimeUnit::Nanosecond) => len * 8,
// TODO: this is not accurate, but should be good enough for now
DataType::Utf8 => len * 100 + len * 4,
DataType::LargeUtf8 => len * 100 + len * 8,
DataType::Decimal128(_, _) => len * 16,
DataType::Dictionary(key_type, value_type) => {
// TODO: this is not accurate, but should be good enough for now
slot_size(len, key_type.as_ref()) + slot_size(len / 10, value_type.as_ref())
}
// TODO: this is not accurate, but should be good enough for now
DataType::Binary => len * 100 + len * 4,
DataType::LargeBinary => len * 100 + len * 8,
DataType::FixedSizeBinary(s) => len * (*s as usize),
DataType::Timestamp(_, _) => len * 8,
dt => unimplemented!(
"{}",
format!("data type {dt} not supported in shuffle write")
),
fn estimate_memory_size(&self, all_batch_size: usize, all_rows: usize) -> usize {
let array_size = if all_rows == 0 {
0
} else {
((all_batch_size as f64) / (all_rows as f64) * (self.num_active_rows as f64)).ceil()
as usize
};

// Multiply array_size by 2 to take the exponential growth of the array builders into account
let builder_size = array_size * 2;

let frozen_size = self.frozen.capacity();
builder_size + frozen_size
}
}

Expand Down Expand Up @@ -590,6 +543,8 @@ struct ShuffleRepartitioner {
partition_ids: Vec<u64>,
/// The configured batch size
batch_size: usize,
all_batches_memory_size: usize,
all_batches_num_rows: usize,
}

struct ShuffleRepartitionerMetrics {
Expand Down Expand Up @@ -662,13 +617,21 @@ impl ShuffleRepartitioner {
hashes_buf,
partition_ids,
batch_size,
all_batches_memory_size: 0,
all_batches_num_rows: 0,
}
}

/// Shuffles rows in input batch into corresponding partition buffer.
/// This function will slice input batch according to configured batch size and then
/// shuffle rows into corresponding partition buffer.
async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
// Update data size metric
let input_array_memory_size = batch.get_array_memory_size();
self.metrics.data_size.add(input_array_memory_size);
self.all_batches_memory_size += input_array_memory_size;
self.all_batches_num_rows += batch.num_rows();

let mut start = 0;
while start < batch.num_rows() {
let end = (start + self.batch_size).min(batch.num_rows());
Expand Down Expand Up @@ -696,9 +659,6 @@ impl ShuffleRepartitioner {
));
}

// Update data size metric
self.metrics.data_size.add(input.get_array_memory_size());

let time_metric = self.metrics.baseline.elapsed_compute();

// NOTE: in shuffle writer exec, the output_rows metrics represents the
Expand Down Expand Up @@ -756,27 +716,35 @@ impl ShuffleRepartitioner {
let mut partition_starts = partition_ends;
partition_starts.push(input.num_rows());

let mut mem_diff = 0;
// For each interval of row indices of partition, taking rows from input batch and
// appending into output buffer.
for (partition_id, (&start, &end)) in partition_starts
.iter()
.tuple_windows()
.enumerate()
.filter(|(_, (start, end))| start < end)
let mut mem_size = 0;
{
let mut buffered_partitions = self.buffered_partitions.lock().await;
let output = &mut buffered_partitions[partition_id];

// If the range of indices is not big enough, just appending the rows into
// active array builders instead of directly adding them as a record batch.
mem_diff += output.append_rows(
input.columns(),
&shuffled_partition_ids[start..end],
time_metric,
)?;
for (partition_id, (&start, &end)) in
partition_starts.iter().tuple_windows().enumerate()
{
let output = &mut buffered_partitions[partition_id];

if start < end {
// If the range of indices is not big enough, just appending the rows into
// active array builders instead of directly adding them as a record batch.
output.append_rows(
input.columns(),
&shuffled_partition_ids[start..end],
time_metric,
)?;
}

mem_size += output.estimate_memory_size(
self.all_batches_memory_size,
self.all_batches_num_rows,
);
}
}

// We change the reservation by growing or shrinking it based on the estimated memory size
let mem_diff = (mem_size as isize) - (self.reservation.size() as isize);
if mem_diff > 0 {
let mem_increase = mem_diff as usize;
if self.reservation.try_grow(mem_increase).is_err() {
Expand Down Expand Up @@ -893,8 +861,7 @@ impl ShuffleRepartitioner {

timer.stop();

let used = self.reservation.size();
self.reservation.shrink(used);
self.reservation.free();

// shuffle writer always has empty output
Ok(Box::pin(EmptyStream::try_new(Arc::clone(&self.schema))?))
Expand Down Expand Up @@ -1470,59 +1437,71 @@ mod test {
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_expr::expressions::Column;
use tokio::runtime::Runtime;

#[test]
fn test_slot_size() {
let batch_size = 1usize;
// not inclusive of all supported types, but enough to test the function
let supported_primitive_types = [
DataType::Int32,
DataType::Int64,
DataType::UInt32,
DataType::UInt64,
DataType::Float32,
DataType::Float64,
DataType::Boolean,
DataType::Utf8,
DataType::LargeUtf8,
DataType::Binary,
DataType::LargeBinary,
DataType::FixedSizeBinary(16),
];
let expected_slot_size = [4, 8, 4, 8, 4, 8, 1, 104, 108, 104, 108, 16];
supported_primitive_types
.iter()
.zip(expected_slot_size.iter())
.for_each(|(data_type, expected)| {
let slot_size = slot_size(batch_size, data_type);
assert_eq!(slot_size, *expected);
})
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
fn test_insert_larger_batch() {
shuffle_write_test(10000, 1, 16, None);
}

#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
fn test_insert_larger_batch() {
fn test_insert_smaller_batch() {
shuffle_write_test(1000, 1, 16, None);
shuffle_write_test(1000, 10, 16, None);
}

#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
fn test_large_number_of_partitions() {
shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024));
shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024));
}

#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
fn test_large_number_of_partitions_spilling() {
shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024));
}

fn shuffle_write_test(
batch_size: usize,
num_batches: usize,
num_partitions: usize,
memory_limit: Option<usize>,
) {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let mut b = StringBuilder::new();
for i in 0..10000 {
for i in 0..batch_size {
b.append_value(format!("{i}"));
}
let array = b.finish();
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap();

let batches = vec![batch.clone()];
let batches = (0..num_batches).map(|_| batch.clone()).collect::<Vec<_>>();

let partitions = &[batches];
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
)
.unwrap();
let ctx = SessionContext::new();

// 10MB memory should be enough for running this test
let config = SessionConfig::new();
let mut runtime_env_builder = RuntimeEnvBuilder::new();
runtime_env_builder = match memory_limit {
Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0),
None => runtime_env_builder,
};
let runtime_env = Arc::new(runtime_env_builder.build().unwrap());
let ctx = SessionContext::new_with_config_rt(config, runtime_env);
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
let rt = Runtime::new().unwrap();
Expand Down

0 comments on commit 7cebabb

Please sign in to comment.