diff --git a/.github/actions/rust-test/action.yaml b/.github/actions/rust-test/action.yaml index a1cde99ba..4ff144eb6 100644 --- a/.github/actions/rust-test/action.yaml +++ b/.github/actions/rust-test/action.yaml @@ -62,5 +62,6 @@ runs: - name: Run Cargo test shell: bash run: | + ulimit -n 102400 cd native RUST_BACKTRACE=1 cargo test diff --git a/native/Cargo.lock b/native/Cargo.lock index c9301c6e3..9f436c32c 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -892,6 +892,7 @@ dependencies = [ "datafusion-expr", "datafusion-functions-nested", "datafusion-physical-expr", + "either", "flate2", "futures", "half", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 13f6b135f..83a18b5bb 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -77,6 +77,7 @@ crc32fast = "1.3.2" simd-adler32 = "0.3.7" datafusion-comet-spark-expr = { workspace = true } datafusion-comet-proto = { workspace = true } +either = "1.13.0" [dev-dependencies] pprof = { version = "0.13.0", features = ["flamegraph"] } diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index efd653b5b..f0078e883 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -55,6 +55,7 @@ use datafusion::{ }, }; use datafusion_physical_expr::EquivalenceProperties; +use either::{Either, Left, Right}; use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use simd_adler32::Adler32; @@ -233,10 +234,8 @@ impl PartitionBuffer { } /// Initializes active builders if necessary. - async fn init_active_if_necessary( - &mut self, - repartitioner: &ShuffleRepartitioner, - ) -> Result { + /// Returns error if memory reservation fails. + fn init_active_if_necessary(&mut self) -> Result { let mut mem_diff = 0; if self.active.is_empty() { @@ -250,15 +249,7 @@ impl PartitionBuffer { .sum::(); } - if self - .reservation - .try_grow(self.active_slots_mem_size) - .is_err() - { - repartitioner.spill().await?; - self.reservation.free(); - self.reservation.try_grow(self.active_slots_mem_size)?; - } + self.reservation.try_grow(self.active_slots_mem_size)?; self.active = new_array_builders(&self.schema, self.batch_size); @@ -267,32 +258,23 @@ impl PartitionBuffer { Ok(mem_diff) } - /// Appends all rows of given batch into active array builders. - async fn append_batch( - &mut self, - batch: &RecordBatch, - time_metric: &Time, - repartitioner: &ShuffleRepartitioner, - ) -> Result { - let columns = batch.columns(); - let indices = (0..batch.num_rows()).collect::>(); - self.append_rows(columns, &indices, time_metric, repartitioner) - .await - } - /// Appends rows of specified indices from columns into active array builders. - async fn append_rows( + fn append_rows( &mut self, columns: &[ArrayRef], indices: &[usize], + start_index: usize, time_metric: &Time, - repartitioner: &ShuffleRepartitioner, - ) -> Result { + ) -> Either, usize> { let mut mem_diff = 0; - let mut start = 0; + let mut start = start_index; // lazy init because some partition may be empty - mem_diff += self.init_active_if_necessary(repartitioner).await?; + let init = self.init_active_if_necessary(); + if init.is_err() { + return Right(start); + } + mem_diff += init.unwrap(); while start < indices.len() { let end = (start + self.batch_size).min(indices.len()); @@ -305,14 +287,22 @@ impl PartitionBuffer { self.num_active_rows += end - start; if self.num_active_rows >= self.batch_size { let mut timer = time_metric.timer(); - mem_diff += self.flush()?; + let flush = self.flush(); + if let Err(e) = flush { + return Left(Err(e)); + } + mem_diff += flush.unwrap(); timer.stop(); - mem_diff += self.init_active_if_necessary(repartitioner).await?; + let init = self.init_active_if_necessary(); + if init.is_err() { + return Right(end); + } + mem_diff += init.unwrap(); } start = end; } - Ok(mem_diff) + Left(Ok(mem_diff)) } /// flush active data into frozen bytes @@ -326,7 +316,7 @@ impl PartitionBuffer { let active = std::mem::take(&mut self.active); let num_rows = self.num_active_rows; self.num_active_rows = 0; - mem_diff -= self.active_slots_mem_size as isize; + self.reservation.try_shrink(self.active_slots_mem_size)?; let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; @@ -610,7 +600,7 @@ struct ShuffleRepartitioner { output_data_file: String, output_index_file: String, schema: SchemaRef, - buffered_partitions: Mutex>, + buffered_partitions: Vec, spills: Mutex>, /// Sort expressions /// Partitioning scheme to use @@ -683,18 +673,11 @@ impl ShuffleRepartitioner { output_data_file, output_index_file, schema: Arc::clone(&schema), - buffered_partitions: Mutex::new( - (0..num_output_partitions) - .map(|partition_id| { - PartitionBuffer::new( - Arc::clone(&schema), - batch_size, - partition_id, - &runtime, - ) - }) - .collect::>(), - ), + buffered_partitions: (0..num_output_partitions) + .map(|partition_id| { + PartitionBuffer::new(Arc::clone(&schema), batch_size, partition_id, &runtime) + }) + .collect::>(), spills: Mutex::new(vec![]), partitioning, num_output_partitions, @@ -741,8 +724,6 @@ impl ShuffleRepartitioner { // Update data size metric self.metrics.data_size.add(input.get_array_memory_size()); - let time_metric = self.metrics.baseline.elapsed_compute(); - // NOTE: in shuffle writer exec, the output_rows metrics represents the // number of rows those are written to output data file. self.metrics.baseline.record_output(input.num_rows()); @@ -807,17 +788,11 @@ impl ShuffleRepartitioner { .enumerate() .filter(|(_, (start, end))| start < end) { - let mut buffered_partitions = self.buffered_partitions.lock().await; - let output = &mut buffered_partitions[partition_id]; - - // If the range of indices is not big enough, just appending the rows into - // active array builders instead of directly adding them as a record batch. - mem_diff += output - .append_rows( + mem_diff += self + .append_rows_to_partition( input.columns(), &shuffled_partition_ids[start..end], - time_metric, - self, + partition_id, ) .await?; @@ -842,7 +817,7 @@ impl ShuffleRepartitioner { } } Partitioning::UnknownPartitioning(n) if *n == 1 => { - let mut buffered_partitions = self.buffered_partitions.lock().await; + let buffered_partitions = &mut self.buffered_partitions; assert!( buffered_partitions.len() == 1, @@ -850,8 +825,10 @@ impl ShuffleRepartitioner { buffered_partitions.len() ); - let output = &mut buffered_partitions[0]; - output.append_batch(&input, time_metric, self).await?; + let indices = (0..input.num_rows()).collect::>(); + + self.append_rows_to_partition(input.columns(), &indices, 0) + .await?; } other => { // this should be unreachable as long as the validation logic @@ -868,7 +845,7 @@ impl ShuffleRepartitioner { /// Writes buffered shuffled record batches into Arrow IPC bytes. async fn shuffle_write(&mut self) -> Result { let num_output_partitions = self.num_output_partitions; - let mut buffered_partitions = self.buffered_partitions.lock().await; + let buffered_partitions = &mut self.buffered_partitions; let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { @@ -966,16 +943,15 @@ impl ShuffleRepartitioner { self.metrics.data_size.value() } - async fn spill(&self) -> Result { + async fn spill(&mut self) -> Result { log::debug!( "ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)", self.used(), self.spill_count() ); - let mut buffered_partitions = self.buffered_partitions.lock().await; // we could always get a chance to free some memory as long as we are holding some - if buffered_partitions.len() == 0 { + if self.buffered_partitions.is_empty() { return Ok(0); } @@ -986,7 +962,7 @@ impl ShuffleRepartitioner { .disk_manager .create_tmp_file("shuffle writer spill")?; let offsets = spill_into( - &mut buffered_partitions, + &mut self.buffered_partitions, spillfile.path(), self.num_output_partitions, ) @@ -1004,6 +980,60 @@ impl ShuffleRepartitioner { }); Ok(used) } + + /// Appends rows of specified indices from columns into active array builders in the specified partition. + async fn append_rows_to_partition( + &mut self, + columns: &[ArrayRef], + indices: &[usize], + partition_id: usize, + ) -> Result { + let mut mem_diff = 0; + + let output = &mut self.buffered_partitions[partition_id]; + + let time_metric = self.metrics.baseline.elapsed_compute(); + + // If the range of indices is not big enough, just appending the rows into + // active array builders instead of directly adding them as a record batch. + let mut start_index: usize = 0; + let mut output_ret = output.append_rows(columns, indices, start_index, time_metric); + + loop { + match output_ret { + Left(l) => { + mem_diff += l?; + break; + } + Right(new_start) => { + // Cannot allocate enough memory for the array builders in the partition, + // spill partitions and retry. + self.spill().await?; + + let output = &mut self.buffered_partitions[partition_id]; + output.reservation.free(); + + let time_metric = self.metrics.baseline.elapsed_compute(); + + start_index = new_start; + output_ret = output.append_rows(columns, indices, start_index, time_metric); + + if let Right(new_start) = output_ret { + if new_start == start_index { + // If the start index is not updated, it means that the partition + // is still not able to allocate enough memory for the array builders. + return Err(DataFusionError::Internal( + "Partition is still not able to allocate enough memory for the array builders after spilling." + .to_string(), + )); + } + } + } + } + } + + Ok(mem_diff) + } } /// consume the `buffered_partitions` and do spill into a single temp shuffle output file @@ -1520,6 +1550,8 @@ mod test { use datafusion::physical_plan::common::collect; use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_expr::expressions::Column; use tokio::runtime::Runtime; @@ -1554,25 +1586,63 @@ mod test { #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_insert_larger_batch() { + shuffle_write_test(10000, 1, 16, None); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_insert_smaller_batch() { + shuffle_write_test(1000, 1, 16, None); + shuffle_write_test(1000, 10, 16, None); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_large_number_of_partitions() { + shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024)); + shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024)); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_large_number_of_partitions_spilling() { + shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024)); + } + + fn shuffle_write_test( + batch_size: usize, + num_batches: usize, + num_partitions: usize, + memory_limit: Option, + ) { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); let mut b = StringBuilder::new(); - for i in 0..10000 { + for i in 0..batch_size { b.append_value(format!("{i}")); } let array = b.finish(); let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap(); - let batches = vec![batch.clone()]; + let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); let partitions = &[batches]; let exec = ShuffleWriterExec::try_new( Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()), - Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), + Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), ) .unwrap(); - let ctx = SessionContext::new(); + + // 10MB memory should be enough for running this test + let config = SessionConfig::new(); + let mut runtime_env_builder = RuntimeEnvBuilder::new(); + runtime_env_builder = match memory_limit { + Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0), + None => runtime_env_builder, + }; + let runtime_env = Arc::new(runtime_env_builder.build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx).unwrap(); let rt = Runtime::new().unwrap();