From 5e020683f0bfcf6962582a3ee818d0921dad3cd2 Mon Sep 17 00:00:00 2001 From: kamille <34352236+Rachelint@users.noreply.github.com> Date: Fri, 16 Dec 2022 11:46:29 +0800 Subject: [PATCH] feat: scan row groups in one sst file parallelly (#474) * make scanning sst file in parallel. * rename `read_parallelism` to `background_read_parallelism` in `SstReaderOptions`. * address CR. * fix parallelly reading but and add tests. * add more checks for the `read_parallism`. * address CR. --- Cargo.lock | 8 + Cargo.toml | 1 + analytic_engine/Cargo.toml | 1 + .../src/instance/flush_compaction.rs | 2 + analytic_engine/src/instance/mod.rs | 7 +- analytic_engine/src/instance/open.rs | 8 +- analytic_engine/src/instance/read.rs | 9 +- analytic_engine/src/lib.rs | 8 +- analytic_engine/src/row_iter/mod.rs | 10 +- analytic_engine/src/sst/factory.rs | 12 +- .../src/sst/parquet/async_reader.rs | 412 +++++++++++++++--- analytic_engine/src/sst/parquet/builder.rs | 2 + analytic_engine/src/sst/parquet/reader.rs | 1 + benchmarks/src/merge_memtable_bench.rs | 2 + benchmarks/src/merge_sst_bench.rs | 2 + benchmarks/src/sst_bench.rs | 2 + benchmarks/src/sst_tools.rs | 5 + benchmarks/src/util.rs | 2 + common_types/src/projected_schema.rs | 2 +- common_types/src/record_batch.rs | 2 +- tools/src/bin/sst-convert.rs | 2 + 21 files changed, 437 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96e183dd11..b3d10b475a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,6 +106,7 @@ dependencies = [ "prometheus 0.12.0", "prost", "proto 1.0.0-alpha01", + "rand 0.7.3", "serde", "serde_derive", "skiplist", @@ -833,6 +834,7 @@ dependencies = [ "query_engine", "server", "signal-hook", + "sort", "table_engine", "tracing_util", "vergen", @@ -5464,6 +5466,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "sort" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79b736394cbcbad245669dc12a0caff9b2570e9ad81ce9906f5d7831551cb0c5" + [[package]] name = "spin" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index 2bdc179fec..4f5f2e7177 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,6 +131,7 @@ meta_client = { workspace = true } query_engine = { workspace = true } server = { workspace = true } signal-hook = "0.3" +sort = "0.8.5" table_engine = { workspace = true } tracing_util = { workspace = true } diff --git a/analytic_engine/Cargo.toml b/analytic_engine/Cargo.toml index 3826f26428..6b79b4273a 100644 --- a/analytic_engine/Cargo.toml +++ b/analytic_engine/Cargo.toml @@ -51,4 +51,5 @@ wal = { workspace = true } common_types = { workspace = true, features = ["test"] } common_util = { workspace = true, features = ["test"] } env_logger = { workspace = true } +rand = { workspace = true } wal = { workspace = true, features = ["test"] } diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 5d983fa426..9b996e957d 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -918,6 +918,8 @@ impl SpaceStore { predicate: Arc::new(Predicate::empty()), meta_cache: self.meta_cache.clone(), runtime: runtime.clone(), + background_read_parallelism: 1, + num_rows_per_row_group: table_options.num_rows_per_row_group, }; let mut builder = MergeBuilder::new(MergeConfig { request_id, diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 4602792317..195dd208d9 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -33,6 +33,7 @@ use wal::manager::WalManagerRef; use crate::{ compaction::scheduler::CompactionSchedulerRef, meta::ManifestRef, + row_iter::IterOptions, space::{SpaceId, SpaceRef}, sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger, meta_cache::MetaCacheRef}, table::data::TableDataRef, @@ -170,10 +171,10 @@ pub struct Instance { pub(crate) db_write_buffer_size: usize, /// Space write buffer size pub(crate) space_write_buffer_size: usize, - /// replay wal batch size + /// Replay wal batch size pub(crate) replay_batch_size: usize, - /// batch size for scan sst - pub(crate) scan_batch_size: usize, + /// Options for scanning sst + pub(crate) iter_options: IterOptions, } impl Instance { diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 2a310a08ad..fd19c5abab 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -34,6 +34,7 @@ use crate::{ }, meta::{meta_data::TableManifestData, ManifestRef}, payload::{ReadPayload, WalDecoder}, + row_iter::IterOptions, space::{Space, SpaceId, SpaceRef}, sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger}, table::data::{TableData, TableDataRef}, @@ -72,6 +73,11 @@ impl Instance { WalSynchronizer::new(WalSynchronizerConfig::default(), wal_manager); wal_synchronizer.start(&bg_runtime).await; + let iter_options = IterOptions { + batch_size: ctx.config.scan_batch_size, + sst_background_read_parallelism: ctx.config.sst_background_read_parallelism, + }; + let instance = Arc::new(Instance { space_store, runtimes: ctx.runtimes.clone(), @@ -86,7 +92,7 @@ impl Instance { db_write_buffer_size: ctx.config.db_write_buffer_size, space_write_buffer_size: ctx.config.space_write_buffer_size, replay_batch_size: ctx.config.replay_batch_size, - scan_batch_size: ctx.config.scan_batch_size, + iter_options, }); Ok(instance) diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index 999c22cb76..b7b49aee24 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -94,7 +94,7 @@ impl Instance { // Collect metrics. table_data.metrics.on_read_request_begin(); - let iter_options = IterOptions::new(self.scan_batch_size); + let iter_options = self.iter_options.clone(); let table_options = table_data.table_options(); if need_merge_sort_streams(&table_data.table_options(), &request) { @@ -104,7 +104,7 @@ impl Instance { self.build_partitioned_streams(&request, merge_iters) } else { let chain_iters = self - .build_chain_iters(table_data, &request, &table_options) + .build_chain_iters(table_data, &request, iter_options, &table_options) .await?; self.build_partitioned_streams(&request, chain_iters) } @@ -160,6 +160,8 @@ impl Instance { predicate: request.predicate.clone(), meta_cache: self.meta_cache.clone(), runtime: self.read_runtime().clone(), + background_read_parallelism: iter_options.sst_background_read_parallelism, + num_rows_per_row_group: table_options.num_rows_per_row_group, }; let time_range = request.predicate.time_range(); @@ -205,6 +207,7 @@ impl Instance { &self, table_data: &TableData, request: &ReadRequest, + iter_options: IterOptions, table_options: &TableOptions, ) -> Result> { let projected_schema = request.projected_schema.clone(); @@ -220,6 +223,8 @@ impl Instance { predicate: request.predicate.clone(), meta_cache: self.meta_cache.clone(), runtime: self.read_runtime().clone(), + background_read_parallelism: iter_options.sst_background_read_parallelism, + num_rows_per_row_group: table_options.num_rows_per_row_group, }; let time_range = request.predicate.time_range(); diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index c3ad3f1b84..14491758fa 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -73,10 +73,13 @@ pub struct Config { pub space_write_buffer_size: usize, /// The maximum size of all Write Buffers across all spaces. pub db_write_buffer_size: usize, - // End of global write buffer options. + /// End of global write buffer options. - // Batch size for scan sst + // Iterator scanning options + /// Batch size for iterator pub scan_batch_size: usize, + /// Sst background reading parallelism + pub sst_background_read_parallelism: usize, /// Wal storage config /// @@ -108,6 +111,7 @@ impl Default for Config { /// it. db_write_buffer_size: 0, scan_batch_size: 500, + sst_background_read_parallelism: 8, wal_storage: WalStorageConfig::RocksDB, } } diff --git a/analytic_engine/src/row_iter/mod.rs b/analytic_engine/src/row_iter/mod.rs index 28f1cf7565..009c80de87 100644 --- a/analytic_engine/src/row_iter/mod.rs +++ b/analytic_engine/src/row_iter/mod.rs @@ -28,17 +28,21 @@ const RECORD_BATCH_READ_BUF_SIZE: usize = 10; #[derive(Debug, Clone)] pub struct IterOptions { pub batch_size: usize, + pub sst_background_read_parallelism: usize, } impl IterOptions { - pub fn new(batch_size: usize) -> Self { - Self { batch_size } + pub fn new(batch_size: usize, sst_background_read_parallelism: usize) -> Self { + Self { + batch_size, + sst_background_read_parallelism, + } } } impl Default for IterOptions { fn default() -> Self { - Self::new(500) + Self::new(500, 1) } } diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index 19be49f924..9109ec66ec 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -52,6 +52,12 @@ pub struct SstReaderOptions { pub predicate: PredicateRef, pub meta_cache: Option, pub runtime: Arc, + + /// The max number of rows in one row group + pub num_rows_per_row_group: usize, + + /// The suggested parallelism while reading sst + pub background_read_parallelism: usize, } #[derive(Debug, Clone)] @@ -78,7 +84,11 @@ impl Factory for FactoryImpl { Some(Box::new(ParquetSstReader::new(path, storage, options))) } else { let reader = AsyncParquetReader::new(path, storage, options); - let reader = ThreadedReader::new(reader, options.runtime.clone()); + let reader = ThreadedReader::new( + reader, + options.runtime.clone(), + options.background_read_parallelism, + ); Some(Box::new(reader)) } } diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 7c59d34c04..c15ed66ccd 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -20,7 +20,7 @@ use common_types::{ use common_util::{runtime::Runtime, time::InstantExt}; use datafusion::datasource::file_format; use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt}; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use object_store::{ObjectMeta, ObjectStoreRef, Path}; use parquet::{ arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, ProjectionMask}, @@ -56,14 +56,20 @@ pub struct Reader<'a> { predicate: PredicateRef, batch_size: usize, - /// init those fields in `init_if_necessary` + /// Init those fields in `init_if_necessary` meta_data: Option, row_projector: Option, + + /// Options for `read_parallelly` + parallelism_options: ParallelismOptions, } impl<'a> Reader<'a> { pub fn new(path: &'a Path, storage: &'a ObjectStoreRef, options: &SstReaderOptions) -> Self { let batch_size = options.read_batch_row_num; + let parallelism_options = + ParallelismOptions::new(options.read_batch_row_num, options.num_rows_per_row_group); + Self { path, storage, @@ -73,9 +79,54 @@ impl<'a> Reader<'a> { batch_size, meta_data: None, row_projector: None, + parallelism_options, } } + async fn maybe_read_parallelly( + &mut self, + read_parallelism: usize, + ) -> Result> + Send + Unpin>>> { + assert!(read_parallelism > 0); + let read_parallelism = if self.parallelism_options.enable_read_parallelly { + read_parallelism + } else { + 1 + }; + + self.init_if_necessary().await?; + let streams = self.fetch_record_batch_streams(read_parallelism).await?; + if streams.is_empty() { + return Ok(Vec::new()); + } + + let row_projector = self.row_projector.take().unwrap(); + let row_projector = ArrowRecordBatchProjector::from(row_projector); + + let storage_format_opts = self + .meta_data + .as_ref() + // metadata must be inited after `init_if_necessary`. + .unwrap() + .custom() + .storage_format_opts + .clone(); + + let streams: Vec<_> = streams + .into_iter() + .map(|stream| { + Box::new(RecordBatchProjector::new( + self.path.to_string(), + stream, + row_projector.clone(), + storage_format_opts.clone(), + )) as _ + }) + .collect(); + + Ok(streams) + } + fn filter_row_groups( &self, schema: SchemaRef, @@ -92,42 +143,81 @@ impl<'a> Reader<'a> { Ok(filter.filter()) } - async fn fetch_record_batch_stream(&mut self) -> Result { + async fn fetch_record_batch_streams( + &mut self, + read_parallelism: usize, + ) -> Result> { assert!(self.meta_data.is_some()); let meta_data = self.meta_data.as_ref().unwrap(); let row_projector = self.row_projector.as_ref().unwrap(); let object_store_reader = ObjectStoreReader::new(self.storage.clone(), self.path.clone(), meta_data.clone()); + + // Get target row groups. let filtered_row_groups = self.filter_row_groups( meta_data.custom().schema.to_arrow_schema_ref(), meta_data.parquet().row_groups(), &meta_data.custom().bloom_filter, )?; - debug!( - "fetch_record_batch row_groups total:{}, after filter:{}", + info!( + "Reader fetch record batches, row_groups total:{}, after filter:{}", meta_data.parquet().num_row_groups(), - filtered_row_groups.len() + filtered_row_groups.len(), + ); + + if filtered_row_groups.is_empty() { + return Ok(Vec::new()); + } + + // Partition the batches by `read_parallelism`. + let suggest_read_parallelism = read_parallelism; + let read_parallelism = std::cmp::min(filtered_row_groups.len(), suggest_read_parallelism); + // Partition by `max_row_groups_in_a_batch`. + debug!( + "Reader fetch record batches parallelly, suggest_read_parallelism:{}, read_parallelism:{}", + suggest_read_parallelism, + read_parallelism, ); + // TODO: we only support read parallelly when `batch_size` == + // `num_rows_per_row_group`, so this placing method is ok, we should + // adjust it when supporting it other situations. + let chunks_num = read_parallelism; + let chunk_size = filtered_row_groups.len() / read_parallelism + 1; + let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num]; + for (row_group_idx, row_group) in filtered_row_groups.into_iter().enumerate() { + let chunk_idx = row_group_idx % chunks_num; + filtered_row_group_chunks + .get_mut(chunk_idx) + .unwrap() + .push(row_group); + } + let proj_mask = ProjectionMask::leaves( meta_data.parquet().file_metadata().schema_descr(), row_projector.existed_source_projection().iter().copied(), ); - let builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) - .await - .with_context(|| ParquetError)?; - let stream = builder - .with_batch_size(self.batch_size) - .with_row_groups(filtered_row_groups) - .with_projection(proj_mask) - .build() - .with_context(|| ParquetError)? - .map(|batch| batch.with_context(|| ParquetError)); + let mut streams = Vec::with_capacity(filtered_row_group_chunks.len()); + for chunk in filtered_row_group_chunks { + let object_store_reader = object_store_reader.clone(); + let builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) + .await + .with_context(|| ParquetError)?; + let stream = builder + .with_batch_size(self.batch_size) + .with_row_groups(chunk) + .with_projection(proj_mask.clone()) + .build() + .with_context(|| ParquetError)? + .map(|batch| batch.with_context(|| ParquetError)); + + streams.push(Box::pin(stream) as _); + } - Ok(Box::pin(stream)) + Ok(streams) } async fn init_if_necessary(&mut self) -> Result<()> { @@ -198,12 +288,47 @@ impl<'a> Reader<'a> { } } -#[derive(Debug)] +/// Options for `read_parallely` in [Reader] +#[derive(Debug, Clone, Copy)] +struct ParallelismOptions { + /// Whether allow parallelly reading. + /// + /// NOTICE: now we only allow `read_parallelly` when + /// `read_batch_row_num` == `num_rows_per_row_group` + /// (surely, `num_rows_per_row_group` > 0). + // TODO: maybe we should support `read_parallelly` in all situations. + enable_read_parallelly: bool, + // TODO: more configs will be add. +} + +impl ParallelismOptions { + fn new(read_batch_row_num: usize, num_rows_per_row_group: usize) -> Self { + let enable_read_parallelly = if read_batch_row_num != num_rows_per_row_group { + warn!( + "Reader new parallelism options not enable, don't allow read parallelly because + read_batch_row_num != num_rows_per_row_group, + read_batch_row_num:{}, num_rows_per_row_group:{}", + read_batch_row_num, num_rows_per_row_group + ); + + false + } else { + true + }; + + Self { + enable_read_parallelly, + } + } +} + +#[derive(Debug, Clone)] struct ReaderMetrics { bytes_scanned: usize, sst_get_range_length_histogram: LocalHistogram, } +#[derive(Clone)] struct ObjectStoreReader { storage: ObjectStoreRef, path: Path, @@ -371,39 +496,56 @@ impl<'a> SstReader for Reader<'a> { async fn read( &mut self, ) -> Result> + Send + Unpin>> { - self.init_if_necessary().await?; - - let stream = self.fetch_record_batch_stream().await?; - let row_projector = self.row_projector.take().unwrap(); - let row_projector = ArrowRecordBatchProjector::from(row_projector); - - let storage_format_opts = self - .meta_data - .as_ref() - // metadata must be inited after `init_if_necessary`. - .unwrap() - .custom() - .storage_format_opts - .clone(); + let mut streams = self.maybe_read_parallelly(1).await?; + assert_eq!(streams.len(), 1); + let stream = streams.pop().expect("impossible to fetch no stream"); - Ok(Box::new(RecordBatchProjector::new( - self.path.to_string(), - stream, - row_projector, - storage_format_opts, - ))) + Ok(stream) } } struct RecordBatchReceiver { - rx: Receiver>, + rx_group: Vec>>, + cur_rx_idx: usize, } impl Stream for RecordBatchReceiver { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().rx.poll_recv(cx) + if self.rx_group.is_empty() { + return Poll::Ready(None); + } + + let cur_rx_idx = self.cur_rx_idx; + // `cur_rx_idx` is impossible to be out-of-range, because it is got by round + // robin. + let rx_group_len = self.rx_group.len(); + let cur_rx = self.rx_group.get_mut(cur_rx_idx).unwrap_or_else(|| { + panic!( + "cur_rx_idx is impossible to be out-of-range, cur_rx_idx:{}, rx_group len:{}", + cur_rx_idx, rx_group_len + ) + }); + let poll_result = cur_rx.poll_recv(cx); + + match poll_result { + Poll::Ready(result) => { + // If found `Poll::Pending`, we need to keep polling current rx + // until found `Poll::Ready` for ensuring the order of record batches, + // because batches are placed into each stream by round robin: + // +------+ +------+ +------+ + // | 1 | | 2 | | 3 | + // +------+ +------+ +------+ + // | 4 | | 5 | | 6 | + // +------+ +------+ +------+ + // | ... | | ... | | ... | + // +------+ +------+ +------+ + self.cur_rx_idx = (self.cur_rx_idx + 1) % self.rx_group.len(); + Poll::Ready(result) + } + Poll::Pending => Poll::Pending, + } } fn size_hint(&self) -> (usize, Option) { @@ -419,28 +561,36 @@ pub struct ThreadedReader<'a> { runtime: Arc, channel_cap: usize, + read_parallelism: usize, } impl<'a> ThreadedReader<'a> { - pub fn new(reader: Reader<'a>, runtime: Arc) -> Self { + pub fn new(reader: Reader<'a>, runtime: Arc, read_parallelism: usize) -> Self { + assert!( + read_parallelism > 0, + "read parallelism must be greater than 0" + ); + Self { inner: reader, runtime, channel_cap: DEFAULT_CHANNEL_CAP, + read_parallelism, } } - async fn read_record_batches(&mut self, tx: Sender>) -> Result<()> { - let mut stream = self.inner.read().await?; + fn read_record_batches_from_sub_reader( + &mut self, + mut reader: Box> + Send + Unpin>, + tx: Sender>, + ) { self.runtime.spawn(async move { - while let Some(batch) = stream.next().await { + while let Some(batch) = reader.next().await { if let Err(e) = tx.send(batch).await { error!("fail to send the fetched record batch result, err:{}", e); } } }); - - Ok(()) } } @@ -453,9 +603,173 @@ impl<'a> SstReader for ThreadedReader<'a> { async fn read( &mut self, ) -> Result> + Send + Unpin>> { - let (tx, rx) = mpsc::channel::>(self.channel_cap); - self.read_record_batches(tx).await?; + // Get underlying sst readers and channels. + let sub_readers = self + .inner + .maybe_read_parallelly(self.read_parallelism) + .await?; + if sub_readers.is_empty() { + return Ok(Box::new(RecordBatchReceiver { + rx_group: Vec::new(), + cur_rx_idx: 0, + }) as _); + } + + let read_parallelism = sub_readers.len(); + debug!( + "ThreadedReader read, suggest read_parallelism:{}, actual:{}", + self.read_parallelism, read_parallelism + ); + + let channel_cap_per_sub_reader = self.channel_cap / self.read_parallelism + 1; + let (tx_group, rx_group): (Vec<_>, Vec<_>) = (0..read_parallelism) + .into_iter() + .map(|_| mpsc::channel::>(channel_cap_per_sub_reader)) + .unzip(); + + // Start the background readings. + for (sub_reader, tx) in sub_readers.into_iter().zip(tx_group.into_iter()) { + self.read_record_batches_from_sub_reader(sub_reader, tx); + } + + Ok(Box::new(RecordBatchReceiver { + rx_group, + cur_rx_idx: 0, + }) as _) + } +} + +#[cfg(test)] +mod tests { + use std::{ + pin::Pin, + task::{Context, Poll}, + time::Duration, + }; + + use futures::{Stream, StreamExt}; + use tokio::sync::mpsc::{self, Receiver, Sender}; + + use super::ParallelismOptions; + + struct MockReceivers { + rx_group: Vec>, + cur_rx_idx: usize, + } + + impl Stream for MockReceivers { + type Item = u32; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let cur_rx_idx = self.cur_rx_idx; + // `cur_rx_idx` is impossible to be out-of-range, because it is got by round + // robin. + let cur_rx = self.rx_group.get_mut(cur_rx_idx).unwrap(); + let poll_result = cur_rx.poll_recv(cx); + + match poll_result { + Poll::Ready(result) => { + self.cur_rx_idx = (self.cur_rx_idx + 1) % self.rx_group.len(); + Poll::Ready(result) + } + Poll::Pending => Poll::Pending, + } + } + + fn size_hint(&self) -> (usize, Option) { + (0, None) + } + } + + struct MockRandomSenders { + tx_group: Vec>, + test_datas: Vec>, + } + + impl MockRandomSenders { + fn start_to_send(&mut self) { + while !self.tx_group.is_empty() { + let tx = self.tx_group.pop().unwrap(); + let test_data = self.test_datas.pop().unwrap(); + tokio::spawn(async move { + for datum in test_data { + let random_millis = rand::random::() % 30; + tokio::time::sleep(Duration::from_millis(random_millis)).await; + tx.send(datum).await.unwrap(); + } + }); + } + } + } + + fn gen_test_data(amount: usize) -> Vec { + (0..amount) + .into_iter() + .map(|_| rand::random::()) + .collect() + } + + // We mock a thread model same as the one in `ThreadedReader` to check its + // validity. + // TODO: we should make the `ThreadedReader` mockable and refactor this test + // using it. + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_simulated_threaded_reader() { + let test_data = gen_test_data(123); + let expected = test_data.clone(); + let channel_cap_per_sub_reader = 10; + let reader_num = 5; + let (tx_group, rx_group): (Vec<_>, Vec<_>) = (0..reader_num) + .into_iter() + .map(|_| mpsc::channel::(channel_cap_per_sub_reader)) + .unzip(); + + // Partition datas. + let chunk_len = reader_num; + let mut test_data_chunks = vec![Vec::new(); chunk_len]; + for (idx, datum) in test_data.into_iter().enumerate() { + let chunk_idx = idx % chunk_len; + test_data_chunks.get_mut(chunk_idx).unwrap().push(datum); + } + + // Start senders. + let mut mock_senders = MockRandomSenders { + tx_group, + test_datas: test_data_chunks, + }; + mock_senders.start_to_send(); + + // Poll receivers. + let mut actual = Vec::new(); + let mut mock_receivers = MockReceivers { + rx_group, + cur_rx_idx: 0, + }; + while let Some(datum) = mock_receivers.next().await { + actual.push(datum); + } + + assert_eq!(actual, expected); + } - Ok(Box::new(RecordBatchReceiver { rx })) + #[test] + fn test_parallelism_options() { + // `read_batch_row_num` < num_rows_per_row_group` + let read_batch_row_num = 2; + let num_rows_per_row_group = 4; + let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); + assert!(!options.enable_read_parallelly); + + // `read_batch_row_num` > num_rows_per_row_group + let read_batch_row_num = 8; + let num_rows_per_row_group = 4; + let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); + assert!(!options.enable_read_parallelly); + + // `read_batch_row_num` == num_rows_per_row_group` + let read_batch_row_num = 4; + let num_rows_per_row_group = 4; + let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); + assert!(options.enable_read_parallelly); } } diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index cc5da7adc7..83dcdb4197 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -365,6 +365,8 @@ mod tests { predicate: Arc::new(Predicate::empty()), meta_cache: None, runtime: runtime.clone(), + num_rows_per_row_group: 5, + background_read_parallelism: 1, }; let mut reader: Box = if async_reader { diff --git a/analytic_engine/src/sst/parquet/reader.rs b/analytic_engine/src/sst/parquet/reader.rs index 4c24db1a0b..2ebba32279 100644 --- a/analytic_engine/src/sst/parquet/reader.rs +++ b/analytic_engine/src/sst/parquet/reader.rs @@ -143,6 +143,7 @@ impl<'a> ParquetSstReader<'a> { Ok(()) } + // This method is not used since we are in favor of `maybe_read_parallelly` now. fn read_record_batches(&mut self, tx: Sender>) -> Result<()> { let path = self.path.to_string(); ensure!(self.reader_builder.is_some(), ReadAgain { path }); diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index 5a31459cf6..59b26925f1 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -197,5 +197,7 @@ fn mock_sst_reader_options( predicate: Arc::new(Predicate::empty()), meta_cache: None, runtime, + background_read_parallelism: 1, + num_rows_per_row_group: 500, } } diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index d85c342aec..90e03f68c0 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -66,6 +66,8 @@ impl MergeSstBench { predicate, meta_cache: meta_cache.clone(), runtime: runtime.clone(), + background_read_parallelism: 1, + num_rows_per_row_group: config.read_batch_row_num, }; let max_projections = cmp::min(config.max_projections, schema.num_columns()); diff --git a/benchmarks/src/sst_bench.rs b/benchmarks/src/sst_bench.rs index d5f92dd838..d218d9579a 100644 --- a/benchmarks/src/sst_bench.rs +++ b/benchmarks/src/sst_bench.rs @@ -45,6 +45,8 @@ impl SstBench { predicate, meta_cache, runtime: runtime.clone(), + background_read_parallelism: 1, + num_rows_per_row_group: config.read_batch_row_num, }; let max_projections = cmp::min(config.max_projections, schema.num_columns()); diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 35adfce9c0..930f6fa836 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -99,6 +99,8 @@ pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc) { predicate: config.predicate.into_predicate(), meta_cache: None, runtime, + background_read_parallelism: 1, + num_rows_per_row_group: config.read_batch_row_num, }; let record_batch_stream = @@ -182,6 +184,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let schema = util::schema_from_sst(&store, &first_sst_path, &None).await; let iter_options = IterOptions { batch_size: config.read_batch_row_num, + sst_background_read_parallelism: 1, }; let request_id = RequestId::next_id(); @@ -198,6 +201,8 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { predicate: config.predicate.into_predicate(), meta_cache: None, runtime: runtime.clone(), + background_read_parallelism: iter_options.sst_background_read_parallelism, + num_rows_per_row_group: config.read_batch_row_num, }; let sst_factory: SstFactoryRef = Arc::new(FactoryImpl::default()); diff --git a/benchmarks/src/util.rs b/benchmarks/src/util.rs index 5b3469136b..abd5bb9855 100644 --- a/benchmarks/src/util.rs +++ b/benchmarks/src/util.rs @@ -103,6 +103,8 @@ pub async fn load_sst_to_memtable( predicate: Arc::new(Predicate::empty()), meta_cache: None, runtime, + background_read_parallelism: 1, + num_rows_per_row_group: 500, }; let sst_factory = FactoryImpl; let mut sst_reader = sst_factory diff --git a/common_types/src/projected_schema.rs b/common_types/src/projected_schema.rs index d946bea488..16578cc39e 100644 --- a/common_types/src/projected_schema.rs +++ b/common_types/src/projected_schema.rs @@ -40,7 +40,7 @@ pub enum Error { pub type Result = std::result::Result; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RowProjector { schema_with_key: RecordSchemaWithKey, source_schema: Schema, diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs index 1b3d51a30a..c21f2a8b57 100644 --- a/common_types/src/record_batch.rs +++ b/common_types/src/record_batch.rs @@ -544,7 +544,7 @@ impl RecordBatchWithKeyBuilder { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ArrowRecordBatchProjector { row_projector: RowProjector, } diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs index 6905f4fd99..cf4bdfcd73 100644 --- a/tools/src/bin/sst-convert.rs +++ b/tools/src/bin/sst-convert.rs @@ -79,6 +79,8 @@ async fn run(args: Args, runtime: Arc) -> Result<()> { predicate: Arc::new(Predicate::empty()), meta_cache: None, runtime, + background_read_parallelism: 1, + num_rows_per_row_group: 8192, }; let mut reader = factory .new_sst_reader(&reader_opts, &input_path, &storage)