diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 16f035bc5..6b105aded 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -66,14 +66,6 @@ use crate::{ }; use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes; -/// The status of appending rows to a partition buffer. -enum AppendRowStatus { - /// The difference in memory usage after appending rows - MemDiff(Result), - /// The index of the next row to append - StartIndex(usize), -} - /// The shuffle writer operator maps each input partition to M output partitions based on a /// partitioning scheme. No guarantees are made about the order of the resulting partitions. #[derive(Debug)] @@ -214,21 +206,10 @@ struct PartitionBuffer { /// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`, /// the active array builders will be frozen and appended to frozen buffer `frozen`. batch_size: usize, - /// Memory reservation for this partition buffer. - reservation: MemoryReservation, } impl PartitionBuffer { - fn new( - schema: SchemaRef, - batch_size: usize, - partition_id: usize, - runtime: &Arc, - ) -> Self { - let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", partition_id)) - .with_can_spill(true) - .register(&runtime.memory_pool); - + fn new(schema: SchemaRef, batch_size: usize) -> Self { Self { schema, frozen: vec![], @@ -236,52 +217,47 @@ impl PartitionBuffer { active_slots_mem_size: 0, num_active_rows: 0, batch_size, - reservation, } } /// Initializes active builders if necessary. - /// Returns error if memory reservation fails. fn init_active_if_necessary(&mut self) -> Result { let mut mem_diff = 0; if self.active.is_empty() { - // Estimate the memory size of active builders + self.active = new_array_builders(&self.schema, self.batch_size); if self.active_slots_mem_size == 0 { self.active_slots_mem_size = self - .schema - .fields() + .active .iter() - .map(|field| slot_size(self.batch_size, field.data_type())) + .zip(self.schema.fields()) + .map(|(_ab, field)| slot_size(self.batch_size, field.data_type())) .sum::(); } - - self.reservation.try_grow(self.active_slots_mem_size)?; - - self.active = new_array_builders(&self.schema, self.batch_size); - mem_diff += self.active_slots_mem_size as isize; } Ok(mem_diff) } + /// Appends all rows of given batch into active array builders. + fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result { + let columns = batch.columns(); + let indices = (0..batch.num_rows()).collect::>(); + self.append_rows(columns, &indices, time_metric) + } + /// Appends rows of specified indices from columns into active array builders. fn append_rows( &mut self, columns: &[ArrayRef], indices: &[usize], - start_index: usize, time_metric: &Time, - ) -> AppendRowStatus { + ) -> Result { let mut mem_diff = 0; - let mut start = start_index; + let mut start = 0; // lazy init because some partition may be empty - let init = self.init_active_if_necessary(); - if init.is_err() { - return AppendRowStatus::StartIndex(start); - } - mem_diff += init.unwrap(); + mem_diff += self.init_active_if_necessary()?; while start < indices.len() { let end = (start + self.batch_size).min(indices.len()); @@ -294,22 +270,14 @@ impl PartitionBuffer { self.num_active_rows += end - start; if self.num_active_rows >= self.batch_size { let mut timer = time_metric.timer(); - let flush = self.flush(); - if let Err(e) = flush { - return AppendRowStatus::MemDiff(Err(e)); - } - mem_diff += flush.unwrap(); + mem_diff += self.flush()?; timer.stop(); - let init = self.init_active_if_necessary(); - if init.is_err() { - return AppendRowStatus::StartIndex(end); - } - mem_diff += init.unwrap(); + mem_diff += self.init_active_if_necessary()?; } start = end; } - AppendRowStatus::MemDiff(Ok(mem_diff)) + Ok(mem_diff) } /// flush active data into frozen bytes @@ -323,7 +291,7 @@ impl PartitionBuffer { let active = std::mem::take(&mut self.active); let num_rows = self.num_active_rows; self.num_active_rows = 0; - self.reservation.try_shrink(self.active_slots_mem_size)?; + mem_diff -= self.active_slots_mem_size as isize; let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; @@ -630,7 +598,7 @@ struct ShuffleRepartitioner { output_data_file: String, output_index_file: String, schema: SchemaRef, - buffered_partitions: Vec, + buffered_partitions: Mutex>, spills: Mutex>, /// Sort expressions /// Partitioning scheme to use @@ -703,11 +671,11 @@ impl ShuffleRepartitioner { output_data_file, output_index_file, schema: Arc::clone(&schema), - buffered_partitions: (0..num_output_partitions) - .map(|partition_id| { - PartitionBuffer::new(Arc::clone(&schema), batch_size, partition_id, &runtime) - }) - .collect::>(), + buffered_partitions: Mutex::new( + (0..num_output_partitions) + .map(|_| PartitionBuffer::new(Arc::clone(&schema), batch_size)) + .collect::>(), + ), spills: Mutex::new(vec![]), partitioning, num_output_partitions, @@ -754,6 +722,8 @@ impl ShuffleRepartitioner { // Update data size metric self.metrics.data_size.add(input.get_array_memory_size()); + let time_metric = self.metrics.baseline.elapsed_compute(); + // NOTE: in shuffle writer exec, the output_rows metrics represents the // number of rows those are written to output data file. self.metrics.baseline.record_output(input.num_rows()); @@ -818,36 +788,34 @@ impl ShuffleRepartitioner { .enumerate() .filter(|(_, (start, end))| start < end) { - mem_diff += self - .append_rows_to_partition( - input.columns(), - &shuffled_partition_ids[start..end], - partition_id, - ) - .await?; - - if mem_diff > 0 { - let mem_increase = mem_diff as usize; - if self.reservation.try_grow(mem_increase).is_err() { - self.spill().await?; - self.reservation.free(); - self.reservation.try_grow(mem_increase)?; - - mem_diff = 0; - } - } - - if mem_diff < 0 { - let mem_used = self.reservation.size(); - let mem_decrease = mem_used.min(-mem_diff as usize); - self.reservation.shrink(mem_decrease); + let mut buffered_partitions = self.buffered_partitions.lock().await; + let output = &mut buffered_partitions[partition_id]; + + // If the range of indices is not big enough, just appending the rows into + // active array builders instead of directly adding them as a record batch. + mem_diff += output.append_rows( + input.columns(), + &shuffled_partition_ids[start..end], + time_metric, + )?; + } - mem_diff += mem_decrease as isize; + if mem_diff > 0 { + let mem_increase = mem_diff as usize; + if self.reservation.try_grow(mem_increase).is_err() { + self.spill().await?; + self.reservation.free(); + self.reservation.try_grow(mem_increase)?; } } + if mem_diff < 0 { + let mem_used = self.reservation.size(); + let mem_decrease = mem_used.min(-mem_diff as usize); + self.reservation.shrink(mem_decrease); + } } Partitioning::UnknownPartitioning(n) if *n == 1 => { - let buffered_partitions = &mut self.buffered_partitions; + let mut buffered_partitions = self.buffered_partitions.lock().await; assert!( buffered_partitions.len() == 1, @@ -855,10 +823,8 @@ impl ShuffleRepartitioner { buffered_partitions.len() ); - let indices = (0..input.num_rows()).collect::>(); - - self.append_rows_to_partition(input.columns(), &indices, 0) - .await?; + let output = &mut buffered_partitions[0]; + output.append_batch(&input, time_metric)?; } other => { // this should be unreachable as long as the validation logic @@ -875,7 +841,7 @@ impl ShuffleRepartitioner { /// Writes buffered shuffled record batches into Arrow IPC bytes. async fn shuffle_write(&mut self) -> Result { let num_output_partitions = self.num_output_partitions; - let buffered_partitions = &mut self.buffered_partitions; + let mut buffered_partitions = self.buffered_partitions.lock().await; let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { @@ -973,15 +939,16 @@ impl ShuffleRepartitioner { self.metrics.data_size.value() } - async fn spill(&mut self) -> Result { + async fn spill(&self) -> Result { log::debug!( "ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)", self.used(), self.spill_count() ); + let mut buffered_partitions = self.buffered_partitions.lock().await; // we could always get a chance to free some memory as long as we are holding some - if self.buffered_partitions.is_empty() { + if buffered_partitions.len() == 0 { return Ok(0); } @@ -992,7 +959,7 @@ impl ShuffleRepartitioner { .disk_manager .create_tmp_file("shuffle writer spill")?; let offsets = spill_into( - &mut self.buffered_partitions, + &mut buffered_partitions, spillfile.path(), self.num_output_partitions, ) @@ -1010,60 +977,6 @@ impl ShuffleRepartitioner { }); Ok(used) } - - /// Appends rows of specified indices from columns into active array builders in the specified partition. - async fn append_rows_to_partition( - &mut self, - columns: &[ArrayRef], - indices: &[usize], - partition_id: usize, - ) -> Result { - let mut mem_diff = 0; - - let output = &mut self.buffered_partitions[partition_id]; - - let time_metric = self.metrics.baseline.elapsed_compute(); - - // If the range of indices is not big enough, just appending the rows into - // active array builders instead of directly adding them as a record batch. - let mut start_index: usize = 0; - let mut output_ret = output.append_rows(columns, indices, start_index, time_metric); - - loop { - match output_ret { - AppendRowStatus::MemDiff(l) => { - mem_diff += l?; - break; - } - AppendRowStatus::StartIndex(new_start) => { - // Cannot allocate enough memory for the array builders in the partition, - // spill partitions and retry. - self.spill().await?; - - let output = &mut self.buffered_partitions[partition_id]; - output.reservation.free(); - - let time_metric = self.metrics.baseline.elapsed_compute(); - - start_index = new_start; - output_ret = output.append_rows(columns, indices, start_index, time_metric); - - if let AppendRowStatus::StartIndex(new_start) = output_ret { - if new_start == start_index { - // If the start index is not updated, it means that the partition - // is still not able to allocate enough memory for the array builders. - return Err(DataFusionError::Internal( - "Partition is still not able to allocate enough memory for the array builders after spilling." - .to_string(), - )); - } - } - } - } - } - - Ok(mem_diff) - } } /// consume the `buffered_partitions` and do spill into a single temp shuffle output file @@ -1580,8 +1493,6 @@ mod test { use datafusion::physical_plan::common::collect; use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; - use datafusion_execution::config::SessionConfig; - use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_expr::expressions::Column; use tokio::runtime::Runtime; @@ -1616,65 +1527,25 @@ mod test { #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_insert_larger_batch() { - shuffle_write_test(10000, 1, 16, None); - } - - #[test] - #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` - fn test_insert_smaller_batch() { - shuffle_write_test(1000, 1, 16, None); - shuffle_write_test(1000, 10, 16, None); - } - - #[test] - #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` - #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too many open files". - fn test_large_number_of_partitions() { - shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024)); - shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024)); - } - - #[test] - #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` - #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too many open files". - fn test_large_number_of_partitions_spilling() { - shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024)); - } - - fn shuffle_write_test( - batch_size: usize, - num_batches: usize, - num_partitions: usize, - memory_limit: Option, - ) { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); let mut b = StringBuilder::new(); - for i in 0..batch_size { + for i in 0..10000 { b.append_value(format!("{i}")); } let array = b.finish(); let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap(); - let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); + let batches = vec![batch.clone()]; let partitions = &[batches]; let exec = ShuffleWriterExec::try_new( Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()), - Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), + Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), ) .unwrap(); - - // 10MB memory should be enough for running this test - let config = SessionConfig::new(); - let mut runtime_env_builder = RuntimeEnvBuilder::new(); - runtime_env_builder = match memory_limit { - Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0), - None => runtime_env_builder, - }; - let runtime_env = Arc::new(runtime_env_builder.build().unwrap()); - let ctx = SessionContext::new_with_config_rt(config, runtime_env); + let ctx = SessionContext::new(); let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx).unwrap(); let rt = Runtime::new().unwrap();