diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 741c492019..3511035e2c 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -95,12 +95,7 @@ impl<'a> Reader<'a> { }; self.init_if_necessary().await?; - let streams = self - .fetch_record_batch_streams( - read_parallelism, - self.parallelism_options.max_row_groups_in_batch, - ) - .await?; + let streams = self.fetch_record_batch_streams(read_parallelism).await?; if streams.is_empty() { return Ok(Vec::new()); } @@ -151,10 +146,8 @@ impl<'a> Reader<'a> { async fn fetch_record_batch_streams( &mut self, read_parallelism: usize, - max_row_groups_in_batch: usize, ) -> Result> { assert!(self.meta_data.is_some()); - assert!(max_row_groups_in_batch > 0); let meta_data = self.meta_data.as_ref().unwrap(); let row_projector = self.row_projector.as_ref().unwrap(); @@ -167,41 +160,39 @@ impl<'a> Reader<'a> { meta_data.parquet().row_groups(), &meta_data.custom().bloom_filter, )?; - let filtered_row_group_len = filtered_row_groups.len(); info!( "Reader fetch record batches, row_groups total:{}, after filter:{}", meta_data.parquet().num_row_groups(), - filtered_row_group_len, + filtered_row_groups.len(), ); if filtered_row_groups.is_empty() { return Ok(Vec::new()); } + // Partition the batches by `read_parallelism`. + let suggest_read_parallelism = read_parallelism; + let read_parallelism = std::cmp::min(filtered_row_groups.len(), suggest_read_parallelism); // Partition by `max_row_groups_in_a_batch`. - let row_group_batches = partition_row_groups(filtered_row_groups, max_row_groups_in_batch); debug!( - "Reader fetch record batches parallelly, - row_groups total:{}, after filter:{}, max_row_groups_in_a_batch:{}, row_group_batches:{}, read_parallelism:{}", - meta_data.parquet().num_row_groups(), - filtered_row_group_len, - max_row_groups_in_batch, - row_group_batches.len(), + "Reader fetch record batches parallelly, suggest_read_parallelism:{}, read_parallelism:{}", + suggest_read_parallelism, read_parallelism, ); - // Partition the batches by `read_parallelism`. - let read_parallelism = std::cmp::min(row_group_batches.len(), read_parallelism); + // TODO: we only support read parallelly when `batch_size` == + // `num_rows_per_row_group`, so this placing method is ok, we should + // adjust it when supporting it other situations. let chunks_num = read_parallelism; - let chunk_size = filtered_row_group_len / read_parallelism + 1; + let chunk_size = filtered_row_groups.len() / read_parallelism + 1; let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num]; - for (row_group_batch_idx, row_group_batch) in row_group_batches.into_iter().enumerate() { - let chunk_idx = row_group_batch_idx % chunks_num; + for (row_group_idx, row_group) in filtered_row_groups.into_iter().enumerate() { + let chunk_idx = row_group_idx % chunks_num; filtered_row_group_chunks .get_mut(chunk_idx) .unwrap() - .extend(row_group_batch); + .push(row_group); } let proj_mask = ProjectionMask::leaves( @@ -307,48 +298,36 @@ struct ParallelismOptions { /// (surely, `num_rows_per_row_group` > 0). // TODO: maybe we should support `read_parallelly` in all situations. enable_read_parallelly: bool, - - /// The max row groups in a batch - max_row_groups_in_batch: usize, + // TODO: more configs will be add. } impl ParallelismOptions { fn new(read_batch_row_num: usize, num_rows_per_row_group: usize) -> Self { - // Check if `read_batch_row_num` % `num_rows_per_row_group` != 0. - // If `false`, parallelly reading is not allowed. - let enable_read_parallelly = read_batch_row_num % num_rows_per_row_group == 0; - if !enable_read_parallelly { + // We only support `read_parallelly` when `read_batch_row_num` == + // `num_rows_per_row_group` now, reasons: + // + underlying parquet reader will just read `num_rows_per_row_group` rows + // while when passed `read_batch_row_num` greater then + // `num_rows_per_row_group`. + we split row groups to steams by round + // robin, it is hard to keep order when `read_batch_row_num` less then + // `num_rows_per_row_group`. + let enable_read_parallelly = if read_batch_row_num != num_rows_per_row_group { warn!( "Reader new parallelism options not enable, don't allow read parallelly, read_batch_row_num:{}, num_rows_per_row_group:{}", read_batch_row_num, num_rows_per_row_group ); - } - let max_row_groups_in_batch = if read_batch_row_num >= num_rows_per_row_group { - read_batch_row_num / num_rows_per_row_group + false } else { - 1 + true }; Self { enable_read_parallelly, - max_row_groups_in_batch, } } } -#[inline] -fn partition_row_groups( - filtered_row_groups: Vec, - max_row_groups_in_a_batch: usize, -) -> Vec> { - filtered_row_groups - .chunks(max_row_groups_in_a_batch) - .map(|chunk| chunk.to_vec()) - .collect() -} - #[derive(Debug, Clone)] struct ReaderMetrics { bytes_scanned: usize, @@ -558,6 +537,16 @@ impl Stream for RecordBatchReceiver { match poll_result { Poll::Ready(result) => { + // If found `Poll::Pending`, we need to keep polling current rx + // until found `Poll::Ready` for ensuring the order of record batches, + // because batches are placed into each stream by round robin: + // +------+ +------+ +------+ + // | 1 | | 2 | | 3 | + // +------+ +------+ +------+ + // | 4 | | 5 | | 6 | + // +------+ +------+ +------+ + // | ... | | ... | | ... | + // +------+ +------+ +------+ self.cur_rx_idx = (self.cur_rx_idx + 1) % self.rx_group.len(); Poll::Ready(result) } @@ -629,11 +618,7 @@ impl<'a> SstReader for ThreadedReader<'a> { }) as _); } - let read_parallelism = if self.read_parallelism >= sub_readers.len() { - sub_readers.len() - } else { - self.read_parallelism - }; + let read_parallelism = sub_readers.len(); debug!( "ThreadedReader read, suggest read_parallelism:{}, actual:{}", self.read_parallelism, read_parallelism @@ -668,7 +653,7 @@ mod tests { use futures::{Stream, StreamExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; - use super::{partition_row_groups, ParallelismOptions}; + use super::ParallelismOptions; struct MockReceivers { rx_group: Vec>, @@ -772,48 +757,22 @@ mod tests { #[test] fn test_parallelism_options() { - // `read_batch_row_num` < num_rows_per_row_group` && `read_batch_row_num` % - // num_rows_per_row_group` == 0 + // `read_batch_row_num` < num_rows_per_row_group` let read_batch_row_num = 2; let num_rows_per_row_group = 4; let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); assert!(!options.enable_read_parallelly); - assert_eq!(options.max_row_groups_in_batch, 1); - // `read_batch_row_num` < num_rows_per_row_group` && `read_batch_row_num` % - // num_rows_per_row_group` != 0 - let read_batch_row_num = 3; + // `read_batch_row_num` > num_rows_per_row_group + let read_batch_row_num = 8; let num_rows_per_row_group = 4; let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); assert!(!options.enable_read_parallelly); - assert_eq!(options.max_row_groups_in_batch, 1); - // `read_batch_row_num` >= num_rows_per_row_group` && `read_batch_row_num` % - // num_rows_per_row_group` == 0 - let read_batch_row_num = 8; + // `read_batch_row_num` == num_rows_per_row_group` + let read_batch_row_num = 4; let num_rows_per_row_group = 4; let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); assert!(options.enable_read_parallelly); - assert_eq!(options.max_row_groups_in_batch, 2); - - // `read_batch_row_num` >= num_rows_per_row_group` && `read_batch_row_num` % - // num_rows_per_row_group` != 0 - let read_batch_row_num = 7; - let num_rows_per_row_group = 4; - let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); - assert!(!options.enable_read_parallelly); - assert_eq!(options.max_row_groups_in_batch, 1); - } - - #[test] - fn test_partition_row_groups() { - let test_row_groups = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - let test_max_groups_in_batch = 3; - - let expected = vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9], vec![10]]; - let partitioned_row_group_batches = - partition_row_groups(test_row_groups, test_max_groups_in_batch); - assert_eq!(partitioned_row_group_batches.len(), 4); - assert_eq!(expected, partitioned_row_group_batches); } }