diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 3fc436bba0..1d613297d6 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -898,6 +898,7 @@ impl SpaceStore { predicate: Arc::new(Predicate::empty()), meta_cache: self.meta_cache.clone(), runtime: runtime.clone(), + read_parallelism: 1, }; let mut builder = MergeBuilder::new(MergeConfig { request_id, diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 4602792317..195dd208d9 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -33,6 +33,7 @@ use wal::manager::WalManagerRef; use crate::{ compaction::scheduler::CompactionSchedulerRef, meta::ManifestRef, + row_iter::IterOptions, space::{SpaceId, SpaceRef}, sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger, meta_cache::MetaCacheRef}, table::data::TableDataRef, @@ -170,10 +171,10 @@ pub struct Instance { pub(crate) db_write_buffer_size: usize, /// Space write buffer size pub(crate) space_write_buffer_size: usize, - /// replay wal batch size + /// Replay wal batch size pub(crate) replay_batch_size: usize, - /// batch size for scan sst - pub(crate) scan_batch_size: usize, + /// Options for scanning sst + pub(crate) iter_options: IterOptions, } impl Instance { diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 2a310a08ad..fd19c5abab 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -34,6 +34,7 @@ use crate::{ }, meta::{meta_data::TableManifestData, ManifestRef}, payload::{ReadPayload, WalDecoder}, + row_iter::IterOptions, space::{Space, SpaceId, SpaceRef}, sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger}, table::data::{TableData, TableDataRef}, @@ -72,6 +73,11 @@ impl Instance { WalSynchronizer::new(WalSynchronizerConfig::default(), wal_manager); wal_synchronizer.start(&bg_runtime).await; + let iter_options = IterOptions { + batch_size: ctx.config.scan_batch_size, + sst_background_read_parallelism: ctx.config.sst_background_read_parallelism, + }; + let instance = Arc::new(Instance { space_store, runtimes: ctx.runtimes.clone(), @@ -86,7 +92,7 @@ impl Instance { db_write_buffer_size: ctx.config.db_write_buffer_size, space_write_buffer_size: ctx.config.space_write_buffer_size, replay_batch_size: ctx.config.replay_batch_size, - scan_batch_size: ctx.config.scan_batch_size, + iter_options, }); Ok(instance) diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index 999c22cb76..13436635ea 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -94,7 +94,7 @@ impl Instance { // Collect metrics. table_data.metrics.on_read_request_begin(); - let iter_options = IterOptions::new(self.scan_batch_size); + let iter_options = self.iter_options.clone(); let table_options = table_data.table_options(); if need_merge_sort_streams(&table_data.table_options(), &request) { @@ -104,7 +104,7 @@ impl Instance { self.build_partitioned_streams(&request, merge_iters) } else { let chain_iters = self - .build_chain_iters(table_data, &request, &table_options) + .build_chain_iters(table_data, &request, iter_options, &table_options) .await?; self.build_partitioned_streams(&request, chain_iters) } @@ -160,6 +160,7 @@ impl Instance { predicate: request.predicate.clone(), meta_cache: self.meta_cache.clone(), runtime: self.read_runtime().clone(), + read_parallelism: iter_options.sst_background_read_parallelism, }; let time_range = request.predicate.time_range(); @@ -205,6 +206,7 @@ impl Instance { &self, table_data: &TableData, request: &ReadRequest, + iter_options: IterOptions, table_options: &TableOptions, ) -> Result> { let projected_schema = request.projected_schema.clone(); @@ -220,6 +222,7 @@ impl Instance { predicate: request.predicate.clone(), meta_cache: self.meta_cache.clone(), runtime: self.read_runtime().clone(), + read_parallelism: iter_options.sst_background_read_parallelism, }; let time_range = request.predicate.time_range(); diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index c3ad3f1b84..7067e8bc9d 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -73,10 +73,13 @@ pub struct Config { pub space_write_buffer_size: usize, /// The maximum size of all Write Buffers across all spaces. pub db_write_buffer_size: usize, - // End of global write buffer options. + /// End of global write buffer options. - // Batch size for scan sst + // Iterator scanning options + /// Batch size for iterator pub scan_batch_size: usize, + /// Sst background reading parallelism + pub sst_background_read_parallelism: usize, /// Wal storage config /// @@ -108,6 +111,7 @@ impl Default for Config { /// it. db_write_buffer_size: 0, scan_batch_size: 500, + sst_background_read_parallelism: 1, wal_storage: WalStorageConfig::RocksDB, } } diff --git a/analytic_engine/src/row_iter/mod.rs b/analytic_engine/src/row_iter/mod.rs index 28f1cf7565..009c80de87 100644 --- a/analytic_engine/src/row_iter/mod.rs +++ b/analytic_engine/src/row_iter/mod.rs @@ -28,17 +28,21 @@ const RECORD_BATCH_READ_BUF_SIZE: usize = 10; #[derive(Debug, Clone)] pub struct IterOptions { pub batch_size: usize, + pub sst_background_read_parallelism: usize, } impl IterOptions { - pub fn new(batch_size: usize) -> Self { - Self { batch_size } + pub fn new(batch_size: usize, sst_background_read_parallelism: usize) -> Self { + Self { + batch_size, + sst_background_read_parallelism, + } } } impl Default for IterOptions { fn default() -> Self { - Self::new(500) + Self::new(500, 1) } } diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index 19be49f924..42c6125797 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -52,6 +52,9 @@ pub struct SstReaderOptions { pub predicate: PredicateRef, pub meta_cache: Option, pub runtime: Arc, + + /// The parallelism while reading sst + pub read_parallelism: usize, } #[derive(Debug, Clone)] @@ -78,7 +81,11 @@ impl Factory for FactoryImpl { Some(Box::new(ParquetSstReader::new(path, storage, options))) } else { let reader = AsyncParquetReader::new(path, storage, options); - let reader = ThreadedReader::new(reader, options.runtime.clone()); + let reader = ThreadedReader::new( + reader, + options.runtime.clone(), + options.read_parallelism, + ); Some(Box::new(reader)) } } diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 7c59d34c04..6e993d2194 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -56,7 +56,7 @@ pub struct Reader<'a> { predicate: PredicateRef, batch_size: usize, - /// init those fields in `init_if_necessary` + /// Init those fields in `init_if_necessary` meta_data: Option, row_projector: Option, } @@ -76,6 +76,40 @@ impl<'a> Reader<'a> { } } + async fn read_parallelly( + &mut self, + read_parallelism: usize, + ) -> Result> + Send + Unpin>>> { + self.init_if_necessary().await?; + + let streams = self.fetch_record_batch_streams(read_parallelism).await?; + let row_projector = self.row_projector.take().unwrap(); + let row_projector = ArrowRecordBatchProjector::from(row_projector); + + let storage_format_opts = self + .meta_data + .as_ref() + // metadata must be inited after `init_if_necessary`. + .unwrap() + .custom() + .storage_format_opts + .clone(); + + let streams: Vec<_> = streams + .into_iter() + .map(|stream| { + Box::new(RecordBatchProjector::new( + self.path.to_string(), + stream, + row_projector.clone(), + storage_format_opts.clone(), + )) as _ + }) + .collect(); + + Ok(streams) + } + fn filter_row_groups( &self, schema: SchemaRef, @@ -92,7 +126,10 @@ impl<'a> Reader<'a> { Ok(filter.filter()) } - async fn fetch_record_batch_stream(&mut self) -> Result { + async fn fetch_record_batch_streams( + &mut self, + read_parallelism: usize, + ) -> Result> { assert!(self.meta_data.is_some()); let meta_data = self.meta_data.as_ref().unwrap(); @@ -106,28 +143,49 @@ impl<'a> Reader<'a> { )?; debug!( - "fetch_record_batch row_groups total:{}, after filter:{}", + "fetch_record_batch row_groups total:{}, after filter:{}, row_group_num_per_reader:{}", meta_data.parquet().num_row_groups(), - filtered_row_groups.len() + filtered_row_groups.len(), + read_parallelism, ); + // TODO: now `batch_size` in pulling is equal to `num_rows_per_row_group` in + // `TableOption`, so this partition way is reasonable, but it makes no + // sense without above assumption. + let chunks_num = read_parallelism; + let chunk_size = filtered_row_groups.len() / read_parallelism + 1; + let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); read_parallelism]; + for (row_group_idx, row_group) in filtered_row_groups.into_iter().enumerate() { + let chunk_idx = row_group_idx % chunks_num; + filtered_row_group_chunks + .get_mut(chunk_idx) + .unwrap() + .push(row_group); + } + let proj_mask = ProjectionMask::leaves( meta_data.parquet().file_metadata().schema_descr(), row_projector.existed_source_projection().iter().copied(), ); - let builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) - .await - .with_context(|| ParquetError)?; - let stream = builder - .with_batch_size(self.batch_size) - .with_row_groups(filtered_row_groups) - .with_projection(proj_mask) - .build() - .with_context(|| ParquetError)? - .map(|batch| batch.with_context(|| ParquetError)); - - Ok(Box::pin(stream)) + let mut streams = Vec::with_capacity(filtered_row_group_chunks.len()); + for chunk in filtered_row_group_chunks { + let object_store_reader = object_store_reader.clone(); + let builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) + .await + .with_context(|| ParquetError)?; + let stream = builder + .with_batch_size(self.batch_size) + .with_row_groups(chunk) + .with_projection(proj_mask.clone()) + .build() + .with_context(|| ParquetError)? + .map(|batch| batch.with_context(|| ParquetError)); + + streams.push(Box::pin(stream) as _); + } + + Ok(streams) } async fn init_if_necessary(&mut self) -> Result<()> { @@ -198,12 +256,13 @@ impl<'a> Reader<'a> { } } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ReaderMetrics { bytes_scanned: usize, sst_get_range_length_histogram: LocalHistogram, } +#[derive(Clone)] struct ObjectStoreReader { storage: ObjectStoreRef, path: Path, @@ -371,39 +430,31 @@ impl<'a> SstReader for Reader<'a> { async fn read( &mut self, ) -> Result> + Send + Unpin>> { - self.init_if_necessary().await?; - - let stream = self.fetch_record_batch_stream().await?; - let row_projector = self.row_projector.take().unwrap(); - let row_projector = ArrowRecordBatchProjector::from(row_projector); - - let storage_format_opts = self - .meta_data - .as_ref() - // metadata must be inited after `init_if_necessary`. - .unwrap() - .custom() - .storage_format_opts - .clone(); - - Ok(Box::new(RecordBatchProjector::new( - self.path.to_string(), - stream, - row_projector, - storage_format_opts, - ))) + let mut streams = self.read_parallelly(1).await?; + assert_eq!(streams.len(), 1); + let stream = streams.pop().expect("impossible to fetch no stream"); + + Ok(stream) } } struct RecordBatchReceiver { - rx: Receiver>, + rx_group: Vec>>, + cur_rx_idx: usize, } impl Stream for RecordBatchReceiver { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().rx.poll_recv(cx) + let cur_rx_idx = self.cur_rx_idx; + // `cur_rx_idx` is impossible to be out-of-range, because it is got by round + // robin. + let cur_rx = self.rx_group.get_mut(cur_rx_idx).unwrap(); + let poll_result = cur_rx.poll_recv(cx); + + self.cur_rx_idx = (self.cur_rx_idx + 1) % self.rx_group.len(); + poll_result } fn size_hint(&self) -> (usize, Option) { @@ -419,28 +470,36 @@ pub struct ThreadedReader<'a> { runtime: Arc, channel_cap: usize, + read_parallelism: usize, } impl<'a> ThreadedReader<'a> { - pub fn new(reader: Reader<'a>, runtime: Arc) -> Self { + pub fn new(reader: Reader<'a>, runtime: Arc, read_parallelism: usize) -> Self { + assert!( + read_parallelism > 0, + "read parallelism must be greater than 0" + ); + Self { inner: reader, runtime, channel_cap: DEFAULT_CHANNEL_CAP, + read_parallelism, } } - async fn read_record_batches(&mut self, tx: Sender>) -> Result<()> { - let mut stream = self.inner.read().await?; + fn read_record_batches_from_sub_reader( + &mut self, + mut reader: Box> + Send + Unpin>, + tx: Sender>, + ) { self.runtime.spawn(async move { - while let Some(batch) = stream.next().await { + while let Some(batch) = reader.next().await { if let Err(e) = tx.send(batch).await { error!("fail to send the fetched record batch result, err:{}", e); } } }); - - Ok(()) } } @@ -453,9 +512,23 @@ impl<'a> SstReader for ThreadedReader<'a> { async fn read( &mut self, ) -> Result> + Send + Unpin>> { - let (tx, rx) = mpsc::channel::>(self.channel_cap); - self.read_record_batches(tx).await?; + let channel_cap_per_sub_reader = self.channel_cap / self.read_parallelism + 1; + let (tx_group, rx_group): (Vec<_>, Vec<_>) = (0..self.read_parallelism) + .into_iter() + .map(|_| mpsc::channel::>(channel_cap_per_sub_reader)) + .unzip(); + + // Get underlying sst readers. + let sub_readers = self.inner.read_parallelly(self.read_parallelism).await?; + + // Start the background readings. + for (sub_reader, tx) in sub_readers.into_iter().zip(tx_group.into_iter()) { + self.read_record_batches_from_sub_reader(sub_reader, tx); + } - Ok(Box::new(RecordBatchReceiver { rx })) + Ok(Box::new(RecordBatchReceiver { + rx_group, + cur_rx_idx: 0, + }) as _) } } diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index 256b9aa97e..76c452f229 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -372,6 +372,7 @@ mod tests { predicate: Arc::new(Predicate::empty()), meta_cache: None, runtime: runtime.clone(), + read_parallelism: 1, }; let mut reader: Box = if async_reader { diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index 5a31459cf6..e9bd1d0bdb 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -197,5 +197,6 @@ fn mock_sst_reader_options( predicate: Arc::new(Predicate::empty()), meta_cache: None, runtime, + read_parallelism: 1, } } diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index d85c342aec..2e06e3c59e 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -66,6 +66,7 @@ impl MergeSstBench { predicate, meta_cache: meta_cache.clone(), runtime: runtime.clone(), + read_parallelism: 1, }; let max_projections = cmp::min(config.max_projections, schema.num_columns()); diff --git a/benchmarks/src/sst_bench.rs b/benchmarks/src/sst_bench.rs index d5f92dd838..0e5ddd1c7a 100644 --- a/benchmarks/src/sst_bench.rs +++ b/benchmarks/src/sst_bench.rs @@ -45,6 +45,7 @@ impl SstBench { predicate, meta_cache, runtime: runtime.clone(), + read_parallelism: 1, }; let max_projections = cmp::min(config.max_projections, schema.num_columns()); diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 35adfce9c0..1a5710c853 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -99,6 +99,7 @@ pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc) { predicate: config.predicate.into_predicate(), meta_cache: None, runtime, + read_parallelism: 1, }; let record_batch_stream = @@ -182,6 +183,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let schema = util::schema_from_sst(&store, &first_sst_path, &None).await; let iter_options = IterOptions { batch_size: config.read_batch_row_num, + sst_background_read_parallelism: 1, }; let request_id = RequestId::next_id(); @@ -198,6 +200,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { predicate: config.predicate.into_predicate(), meta_cache: None, runtime: runtime.clone(), + read_parallelism: iter_options.sst_background_read_parallelism, }; let sst_factory: SstFactoryRef = Arc::new(FactoryImpl::default()); diff --git a/benchmarks/src/util.rs b/benchmarks/src/util.rs index 5b3469136b..715be7eed0 100644 --- a/benchmarks/src/util.rs +++ b/benchmarks/src/util.rs @@ -103,6 +103,7 @@ pub async fn load_sst_to_memtable( predicate: Arc::new(Predicate::empty()), meta_cache: None, runtime, + read_parallelism: 1, }; let sst_factory = FactoryImpl; let mut sst_reader = sst_factory diff --git a/common_types/src/projected_schema.rs b/common_types/src/projected_schema.rs index d946bea488..16578cc39e 100644 --- a/common_types/src/projected_schema.rs +++ b/common_types/src/projected_schema.rs @@ -40,7 +40,7 @@ pub enum Error { pub type Result = std::result::Result; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RowProjector { schema_with_key: RecordSchemaWithKey, source_schema: Schema, diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs index 1b3d51a30a..c21f2a8b57 100644 --- a/common_types/src/record_batch.rs +++ b/common_types/src/record_batch.rs @@ -544,7 +544,7 @@ impl RecordBatchWithKeyBuilder { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ArrowRecordBatchProjector { row_projector: RowProjector, } diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs index 6905f4fd99..b6a4d25d50 100644 --- a/tools/src/bin/sst-convert.rs +++ b/tools/src/bin/sst-convert.rs @@ -79,6 +79,7 @@ async fn run(args: Args, runtime: Arc) -> Result<()> { predicate: Arc::new(Predicate::empty()), meta_cache: None, runtime, + read_parallelism: 1, }; let mut reader = factory .new_sst_reader(&reader_opts, &input_path, &storage)