From 7cebabb50c6f536b3a93b98aecf14132d38dc3ce Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Wed, 25 Sep 2024 15:49:59 +0800 Subject: [PATCH] Grow or shrink the reservation based on the mean size of the batches and the actual number of staging rows. --- .../execution/datafusion/shuffle_writer.rs | 223 ++++++++---------- 1 file changed, 101 insertions(+), 122 deletions(-) diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 9668359fc..a30fd8522 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -60,10 +60,7 @@ use itertools::Itertools; use simd_adler32::Adler32; use tokio::task; -use crate::{ - common::bit::ceil, - errors::{CometError, CometResult}, -}; +use crate::errors::{CometError, CometResult}; use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes; /// The shuffle writer operator maps each input partition to M output partitions based on a @@ -199,8 +196,6 @@ struct PartitionBuffer { frozen: Vec, /// Array builders for appending rows into buffering batches. active: Vec>, - /// The estimation of memory size of active builders in bytes when they are filled. - active_slots_mem_size: usize, /// Number of rows in active builders. num_active_rows: usize, /// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`, @@ -214,33 +209,20 @@ impl PartitionBuffer { schema, frozen: vec![], active: vec![], - active_slots_mem_size: 0, num_active_rows: 0, batch_size, } } /// Initializes active builders if necessary. - fn init_active_if_necessary(&mut self) -> Result { - let mut mem_diff = 0; - + fn init_active_if_necessary(&mut self) { if self.active.is_empty() { - self.active = new_array_builders(&self.schema, self.batch_size); - if self.active_slots_mem_size == 0 { - self.active_slots_mem_size = self - .active - .iter() - .zip(self.schema.fields()) - .map(|(_ab, field)| slot_size(self.batch_size, field.data_type())) - .sum::(); - } - mem_diff += self.active_slots_mem_size as isize; + self.active = new_array_builders(&self.schema, 0); } - Ok(mem_diff) } /// Appends all rows of given batch into active array builders. - fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result { + fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result<()> { let columns = batch.columns(); let indices = (0..batch.num_rows()).collect::>(); self.append_rows(columns, &indices, time_metric) @@ -252,12 +234,11 @@ impl PartitionBuffer { columns: &[ArrayRef], indices: &[usize], time_metric: &Time, - ) -> Result { - let mut mem_diff = 0; + ) -> Result<()> { let mut start = 0; // lazy init because some partition may be empty - mem_diff += self.init_active_if_necessary()?; + self.init_active_if_necessary(); while start < indices.len() { let end = (start + self.batch_size).min(indices.len()); @@ -270,77 +251,49 @@ impl PartitionBuffer { self.num_active_rows += end - start; if self.num_active_rows >= self.batch_size { let mut timer = time_metric.timer(); - mem_diff += self.flush()?; + self.flush()?; timer.stop(); - mem_diff += self.init_active_if_necessary()?; + self.init_active_if_necessary(); } start = end; } - Ok(mem_diff) + Ok(()) } /// flush active data into frozen bytes - fn flush(&mut self) -> Result { + fn flush(&mut self) -> Result<()> { if self.num_active_rows == 0 { - return Ok(0); + return Ok(()); } - let mut mem_diff = 0isize; // active -> staging let active = std::mem::take(&mut self.active); let num_rows = self.num_active_rows; self.num_active_rows = 0; - mem_diff -= self.active_slots_mem_size as isize; let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; - let frozen_capacity_old = self.frozen.capacity(); let mut cursor = Cursor::new(&mut self.frozen); cursor.seek(SeekFrom::End(0))?; write_ipc_compressed(&frozen_batch, &mut cursor)?; - mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; - Ok(mem_diff) + Ok(()) } -} -fn slot_size(len: usize, data_type: &DataType) -> usize { - match data_type { - DataType::Boolean => ceil(len, 8), - DataType::Int8 => len, - DataType::Int16 => len * 2, - DataType::Int32 => len * 4, - DataType::Int64 => len * 8, - DataType::UInt8 => len, - DataType::UInt16 => len * 2, - DataType::UInt32 => len * 4, - DataType::UInt64 => len * 8, - DataType::Float32 => len * 4, - DataType::Float64 => len * 8, - DataType::Date32 => len * 4, - DataType::Date64 => len * 8, - DataType::Time32(TimeUnit::Second) => len * 4, - DataType::Time32(TimeUnit::Millisecond) => len * 4, - DataType::Time64(TimeUnit::Microsecond) => len * 8, - DataType::Time64(TimeUnit::Nanosecond) => len * 8, - // TODO: this is not accurate, but should be good enough for now - DataType::Utf8 => len * 100 + len * 4, - DataType::LargeUtf8 => len * 100 + len * 8, - DataType::Decimal128(_, _) => len * 16, - DataType::Dictionary(key_type, value_type) => { - // TODO: this is not accurate, but should be good enough for now - slot_size(len, key_type.as_ref()) + slot_size(len / 10, value_type.as_ref()) - } - // TODO: this is not accurate, but should be good enough for now - DataType::Binary => len * 100 + len * 4, - DataType::LargeBinary => len * 100 + len * 8, - DataType::FixedSizeBinary(s) => len * (*s as usize), - DataType::Timestamp(_, _) => len * 8, - dt => unimplemented!( - "{}", - format!("data type {dt} not supported in shuffle write") - ), + fn estimate_memory_size(&self, all_batch_size: usize, all_rows: usize) -> usize { + let array_size = if all_rows == 0 { + 0 + } else { + ((all_batch_size as f64) / (all_rows as f64) * (self.num_active_rows as f64)).ceil() + as usize + }; + + // Multiply array_size by 2 to take the exponential growth of the array builders into account + let builder_size = array_size * 2; + + let frozen_size = self.frozen.capacity(); + builder_size + frozen_size } } @@ -590,6 +543,8 @@ struct ShuffleRepartitioner { partition_ids: Vec, /// The configured batch size batch_size: usize, + all_batches_memory_size: usize, + all_batches_num_rows: usize, } struct ShuffleRepartitionerMetrics { @@ -662,6 +617,8 @@ impl ShuffleRepartitioner { hashes_buf, partition_ids, batch_size, + all_batches_memory_size: 0, + all_batches_num_rows: 0, } } @@ -669,6 +626,12 @@ impl ShuffleRepartitioner { /// This function will slice input batch according to configured batch size and then /// shuffle rows into corresponding partition buffer. async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + // Update data size metric + let input_array_memory_size = batch.get_array_memory_size(); + self.metrics.data_size.add(input_array_memory_size); + self.all_batches_memory_size += input_array_memory_size; + self.all_batches_num_rows += batch.num_rows(); + let mut start = 0; while start < batch.num_rows() { let end = (start + self.batch_size).min(batch.num_rows()); @@ -696,9 +659,6 @@ impl ShuffleRepartitioner { )); } - // Update data size metric - self.metrics.data_size.add(input.get_array_memory_size()); - let time_metric = self.metrics.baseline.elapsed_compute(); // NOTE: in shuffle writer exec, the output_rows metrics represents the @@ -756,27 +716,35 @@ impl ShuffleRepartitioner { let mut partition_starts = partition_ends; partition_starts.push(input.num_rows()); - let mut mem_diff = 0; // For each interval of row indices of partition, taking rows from input batch and // appending into output buffer. - for (partition_id, (&start, &end)) in partition_starts - .iter() - .tuple_windows() - .enumerate() - .filter(|(_, (start, end))| start < end) + let mut mem_size = 0; { let mut buffered_partitions = self.buffered_partitions.lock().await; - let output = &mut buffered_partitions[partition_id]; - - // If the range of indices is not big enough, just appending the rows into - // active array builders instead of directly adding them as a record batch. - mem_diff += output.append_rows( - input.columns(), - &shuffled_partition_ids[start..end], - time_metric, - )?; + for (partition_id, (&start, &end)) in + partition_starts.iter().tuple_windows().enumerate() + { + let output = &mut buffered_partitions[partition_id]; + + if start < end { + // If the range of indices is not big enough, just appending the rows into + // active array builders instead of directly adding them as a record batch. + output.append_rows( + input.columns(), + &shuffled_partition_ids[start..end], + time_metric, + )?; + } + + mem_size += output.estimate_memory_size( + self.all_batches_memory_size, + self.all_batches_num_rows, + ); + } } + // We change the reservation by growing or shrinking it based on the estimated memory size + let mem_diff = (mem_size as isize) - (self.reservation.size() as isize); if mem_diff > 0 { let mem_increase = mem_diff as usize; if self.reservation.try_grow(mem_increase).is_err() { @@ -893,8 +861,7 @@ impl ShuffleRepartitioner { timer.stop(); - let used = self.reservation.size(); - self.reservation.shrink(used); + self.reservation.free(); // shuffle writer always has empty output Ok(Box::pin(EmptyStream::try_new(Arc::clone(&self.schema))?)) @@ -1470,59 +1437,71 @@ mod test { use datafusion::physical_plan::common::collect; use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_expr::expressions::Column; use tokio::runtime::Runtime; #[test] - fn test_slot_size() { - let batch_size = 1usize; - // not inclusive of all supported types, but enough to test the function - let supported_primitive_types = [ - DataType::Int32, - DataType::Int64, - DataType::UInt32, - DataType::UInt64, - DataType::Float32, - DataType::Float64, - DataType::Boolean, - DataType::Utf8, - DataType::LargeUtf8, - DataType::Binary, - DataType::LargeBinary, - DataType::FixedSizeBinary(16), - ]; - let expected_slot_size = [4, 8, 4, 8, 4, 8, 1, 104, 108, 104, 108, 16]; - supported_primitive_types - .iter() - .zip(expected_slot_size.iter()) - .for_each(|(data_type, expected)| { - let slot_size = slot_size(batch_size, data_type); - assert_eq!(slot_size, *expected); - }) + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_insert_larger_batch() { + shuffle_write_test(10000, 1, 16, None); } #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` - fn test_insert_larger_batch() { + fn test_insert_smaller_batch() { + shuffle_write_test(1000, 1, 16, None); + shuffle_write_test(1000, 10, 16, None); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_large_number_of_partitions() { + shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024)); + shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024)); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_large_number_of_partitions_spilling() { + shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024)); + } + + fn shuffle_write_test( + batch_size: usize, + num_batches: usize, + num_partitions: usize, + memory_limit: Option, + ) { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); let mut b = StringBuilder::new(); - for i in 0..10000 { + for i in 0..batch_size { b.append_value(format!("{i}")); } let array = b.finish(); let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap(); - let batches = vec![batch.clone()]; + let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); let partitions = &[batches]; let exec = ShuffleWriterExec::try_new( Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()), - Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), + Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), ) .unwrap(); - let ctx = SessionContext::new(); + + // 10MB memory should be enough for running this test + let config = SessionConfig::new(); + let mut runtime_env_builder = RuntimeEnvBuilder::new(); + runtime_env_builder = match memory_limit { + Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0), + None => runtime_env_builder, + }; + let runtime_env = Arc::new(runtime_env_builder.build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx).unwrap(); let rt = Runtime::new().unwrap();