diff --git a/Cargo.lock b/Cargo.lock index 685ef6579b..b3d10b475a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,7 +106,7 @@ dependencies = [ "prometheus 0.12.0", "prost", "proto 1.0.0-alpha01", - "rand 0.8.5", + "rand 0.7.3", "serde", "serde_derive", "skiplist", @@ -834,6 +834,7 @@ dependencies = [ "query_engine", "server", "signal-hook", + "sort", "table_engine", "tracing_util", "vergen", @@ -5465,6 +5466,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "sort" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79b736394cbcbad245669dc12a0caff9b2570e9ad81ce9906f5d7831551cb0c5" + [[package]] name = "spin" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index 2bdc179fec..4f5f2e7177 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,6 +131,7 @@ meta_client = { workspace = true } query_engine = { workspace = true } server = { workspace = true } signal-hook = "0.3" +sort = "0.8.5" table_engine = { workspace = true } tracing_util = { workspace = true } diff --git a/analytic_engine/Cargo.toml b/analytic_engine/Cargo.toml index f251815735..6b79b4273a 100644 --- a/analytic_engine/Cargo.toml +++ b/analytic_engine/Cargo.toml @@ -51,5 +51,5 @@ wal = { workspace = true } common_types = { workspace = true, features = ["test"] } common_util = { workspace = true, features = ["test"] } env_logger = { workspace = true } +rand = { workspace = true } wal = { workspace = true, features = ["test"] } -rand = "0.8.5" diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 9aeb86e16c..14491758fa 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -111,7 +111,7 @@ impl Default for Config { /// it. db_write_buffer_size: 0, scan_batch_size: 500, - sst_background_read_parallelism: 4, + sst_background_read_parallelism: 8, wal_storage: WalStorageConfig::RocksDB, } } diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 741c492019..c15ed66ccd 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -83,7 +83,7 @@ impl<'a> Reader<'a> { } } - async fn read_parallelly( + async fn maybe_read_parallelly( &mut self, read_parallelism: usize, ) -> Result> + Send + Unpin>>> { @@ -95,12 +95,7 @@ impl<'a> Reader<'a> { }; self.init_if_necessary().await?; - let streams = self - .fetch_record_batch_streams( - read_parallelism, - self.parallelism_options.max_row_groups_in_batch, - ) - .await?; + let streams = self.fetch_record_batch_streams(read_parallelism).await?; if streams.is_empty() { return Ok(Vec::new()); } @@ -151,10 +146,8 @@ impl<'a> Reader<'a> { async fn fetch_record_batch_streams( &mut self, read_parallelism: usize, - max_row_groups_in_batch: usize, ) -> Result> { assert!(self.meta_data.is_some()); - assert!(max_row_groups_in_batch > 0); let meta_data = self.meta_data.as_ref().unwrap(); let row_projector = self.row_projector.as_ref().unwrap(); @@ -167,41 +160,39 @@ impl<'a> Reader<'a> { meta_data.parquet().row_groups(), &meta_data.custom().bloom_filter, )?; - let filtered_row_group_len = filtered_row_groups.len(); info!( "Reader fetch record batches, row_groups total:{}, after filter:{}", meta_data.parquet().num_row_groups(), - filtered_row_group_len, + filtered_row_groups.len(), ); if filtered_row_groups.is_empty() { return Ok(Vec::new()); } + // Partition the batches by `read_parallelism`. + let suggest_read_parallelism = read_parallelism; + let read_parallelism = std::cmp::min(filtered_row_groups.len(), suggest_read_parallelism); // Partition by `max_row_groups_in_a_batch`. - let row_group_batches = partition_row_groups(filtered_row_groups, max_row_groups_in_batch); debug!( - "Reader fetch record batches parallelly, - row_groups total:{}, after filter:{}, max_row_groups_in_a_batch:{}, row_group_batches:{}, read_parallelism:{}", - meta_data.parquet().num_row_groups(), - filtered_row_group_len, - max_row_groups_in_batch, - row_group_batches.len(), + "Reader fetch record batches parallelly, suggest_read_parallelism:{}, read_parallelism:{}", + suggest_read_parallelism, read_parallelism, ); - // Partition the batches by `read_parallelism`. - let read_parallelism = std::cmp::min(row_group_batches.len(), read_parallelism); + // TODO: we only support read parallelly when `batch_size` == + // `num_rows_per_row_group`, so this placing method is ok, we should + // adjust it when supporting it other situations. let chunks_num = read_parallelism; - let chunk_size = filtered_row_group_len / read_parallelism + 1; + let chunk_size = filtered_row_groups.len() / read_parallelism + 1; let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num]; - for (row_group_batch_idx, row_group_batch) in row_group_batches.into_iter().enumerate() { - let chunk_idx = row_group_batch_idx % chunks_num; + for (row_group_idx, row_group) in filtered_row_groups.into_iter().enumerate() { + let chunk_idx = row_group_idx % chunks_num; filtered_row_group_chunks .get_mut(chunk_idx) .unwrap() - .extend(row_group_batch); + .push(row_group); } let proj_mask = ProjectionMask::leaves( @@ -303,52 +294,34 @@ struct ParallelismOptions { /// Whether allow parallelly reading. /// /// NOTICE: now we only allow `read_parallelly` when - /// `read_batch_row_num` % `num_rows_per_row_group` == 0 + /// `read_batch_row_num` == `num_rows_per_row_group` /// (surely, `num_rows_per_row_group` > 0). // TODO: maybe we should support `read_parallelly` in all situations. enable_read_parallelly: bool, - - /// The max row groups in a batch - max_row_groups_in_batch: usize, + // TODO: more configs will be add. } impl ParallelismOptions { fn new(read_batch_row_num: usize, num_rows_per_row_group: usize) -> Self { - // Check if `read_batch_row_num` % `num_rows_per_row_group` != 0. - // If `false`, parallelly reading is not allowed. - let enable_read_parallelly = read_batch_row_num % num_rows_per_row_group == 0; - if !enable_read_parallelly { + let enable_read_parallelly = if read_batch_row_num != num_rows_per_row_group { warn!( - "Reader new parallelism options not enable, don't allow read parallelly, + "Reader new parallelism options not enable, don't allow read parallelly because + read_batch_row_num != num_rows_per_row_group, read_batch_row_num:{}, num_rows_per_row_group:{}", read_batch_row_num, num_rows_per_row_group ); - } - let max_row_groups_in_batch = if read_batch_row_num >= num_rows_per_row_group { - read_batch_row_num / num_rows_per_row_group + false } else { - 1 + true }; Self { enable_read_parallelly, - max_row_groups_in_batch, } } } -#[inline] -fn partition_row_groups( - filtered_row_groups: Vec, - max_row_groups_in_a_batch: usize, -) -> Vec> { - filtered_row_groups - .chunks(max_row_groups_in_a_batch) - .map(|chunk| chunk.to_vec()) - .collect() -} - #[derive(Debug, Clone)] struct ReaderMetrics { bytes_scanned: usize, @@ -523,7 +496,7 @@ impl<'a> SstReader for Reader<'a> { async fn read( &mut self, ) -> Result> + Send + Unpin>> { - let mut streams = self.read_parallelly(1).await?; + let mut streams = self.maybe_read_parallelly(1).await?; assert_eq!(streams.len(), 1); let stream = streams.pop().expect("impossible to fetch no stream"); @@ -558,6 +531,16 @@ impl Stream for RecordBatchReceiver { match poll_result { Poll::Ready(result) => { + // If found `Poll::Pending`, we need to keep polling current rx + // until found `Poll::Ready` for ensuring the order of record batches, + // because batches are placed into each stream by round robin: + // +------+ +------+ +------+ + // | 1 | | 2 | | 3 | + // +------+ +------+ +------+ + // | 4 | | 5 | | 6 | + // +------+ +------+ +------+ + // | ... | | ... | | ... | + // +------+ +------+ +------+ self.cur_rx_idx = (self.cur_rx_idx + 1) % self.rx_group.len(); Poll::Ready(result) } @@ -621,7 +604,10 @@ impl<'a> SstReader for ThreadedReader<'a> { &mut self, ) -> Result> + Send + Unpin>> { // Get underlying sst readers and channels. - let sub_readers = self.inner.read_parallelly(self.read_parallelism).await?; + let sub_readers = self + .inner + .maybe_read_parallelly(self.read_parallelism) + .await?; if sub_readers.is_empty() { return Ok(Box::new(RecordBatchReceiver { rx_group: Vec::new(), @@ -629,11 +615,7 @@ impl<'a> SstReader for ThreadedReader<'a> { }) as _); } - let read_parallelism = if self.read_parallelism >= sub_readers.len() { - sub_readers.len() - } else { - self.read_parallelism - }; + let read_parallelism = sub_readers.len(); debug!( "ThreadedReader read, suggest read_parallelism:{}, actual:{}", self.read_parallelism, read_parallelism @@ -668,7 +650,7 @@ mod tests { use futures::{Stream, StreamExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; - use super::{partition_row_groups, ParallelismOptions}; + use super::ParallelismOptions; struct MockReceivers { rx_group: Vec>, @@ -772,48 +754,22 @@ mod tests { #[test] fn test_parallelism_options() { - // `read_batch_row_num` < num_rows_per_row_group` && `read_batch_row_num` % - // num_rows_per_row_group` == 0 + // `read_batch_row_num` < num_rows_per_row_group` let read_batch_row_num = 2; let num_rows_per_row_group = 4; let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); assert!(!options.enable_read_parallelly); - assert_eq!(options.max_row_groups_in_batch, 1); - // `read_batch_row_num` < num_rows_per_row_group` && `read_batch_row_num` % - // num_rows_per_row_group` != 0 - let read_batch_row_num = 3; + // `read_batch_row_num` > num_rows_per_row_group + let read_batch_row_num = 8; let num_rows_per_row_group = 4; let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); assert!(!options.enable_read_parallelly); - assert_eq!(options.max_row_groups_in_batch, 1); - // `read_batch_row_num` >= num_rows_per_row_group` && `read_batch_row_num` % - // num_rows_per_row_group` == 0 - let read_batch_row_num = 8; + // `read_batch_row_num` == num_rows_per_row_group` + let read_batch_row_num = 4; let num_rows_per_row_group = 4; let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); assert!(options.enable_read_parallelly); - assert_eq!(options.max_row_groups_in_batch, 2); - - // `read_batch_row_num` >= num_rows_per_row_group` && `read_batch_row_num` % - // num_rows_per_row_group` != 0 - let read_batch_row_num = 7; - let num_rows_per_row_group = 4; - let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); - assert!(!options.enable_read_parallelly); - assert_eq!(options.max_row_groups_in_batch, 1); - } - - #[test] - fn test_partition_row_groups() { - let test_row_groups = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - let test_max_groups_in_batch = 3; - - let expected = vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9], vec![10]]; - let partitioned_row_group_batches = - partition_row_groups(test_row_groups, test_max_groups_in_batch); - assert_eq!(partitioned_row_group_batches.len(), 4); - assert_eq!(expected, partitioned_row_group_batches); } } diff --git a/analytic_engine/src/sst/parquet/reader.rs b/analytic_engine/src/sst/parquet/reader.rs index 4c24db1a0b..2ebba32279 100644 --- a/analytic_engine/src/sst/parquet/reader.rs +++ b/analytic_engine/src/sst/parquet/reader.rs @@ -143,6 +143,7 @@ impl<'a> ParquetSstReader<'a> { Ok(()) } + // This method is not used since we are in favor of `maybe_read_parallelly` now. fn read_record_batches(&mut self, tx: Sender>) -> Result<()> { let path = self.path.to_string(); ensure!(self.reader_builder.is_some(), ReadAgain { path });