From dca25d3993bd7827be6addf7e82b7b37f6500f31 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Thu, 23 Mar 2023 17:37:48 +0800 Subject: [PATCH 1/6] feat: make max_record_batches_in_flight configurable --- .../src/instance/flush_compaction.rs | 24 ++-- analytic_engine/src/instance/mod.rs | 5 +- analytic_engine/src/instance/open.rs | 11 +- analytic_engine/src/instance/read.rs | 17 ++- analytic_engine/src/lib.rs | 6 +- analytic_engine/src/row_iter/dedup.rs | 3 +- analytic_engine/src/row_iter/merge.rs | 4 +- analytic_engine/src/row_iter/mod.rs | 69 ++--------- analytic_engine/src/sst/factory.rs | 28 +++-- .../src/sst/parquet/async_reader.rs | 115 +++++------------- analytic_engine/src/sst/parquet/writer.rs | 10 +- benchmarks/src/config.rs | 4 +- benchmarks/src/merge_memtable_bench.rs | 16 ++- benchmarks/src/merge_sst_bench.rs | 15 ++- benchmarks/src/parquet_bench.rs | 2 +- benchmarks/src/sst_bench.rs | 12 +- benchmarks/src/sst_tools.rs | 29 +++-- benchmarks/src/util.rs | 12 +- tools/src/bin/sst-convert.rs | 10 +- 19 files changed, 161 insertions(+), 231 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index a164991fcc..3950cee888 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -49,7 +49,7 @@ use crate::{ }, space::SpaceAndTable, sst::{ - factory::{self, ReadFrequency, SstReadOptions, SstWriteOptions}, + factory::{self, ReadFrequency, ScanOptions, SstReadOptions, SstWriteOptions}, file::FileMeta, meta_data::SstMetaReader, writer::{MetaData, RecordBatchStream}, @@ -838,18 +838,21 @@ impl SpaceStore { let schema = table_data.schema(); let table_options = table_data.table_options(); let projected_schema = ProjectedSchema::no_projection(schema.clone()); + // TODO: make the scan_options configurable for compaction. + let scan_options = ScanOptions::default(); let sst_read_options = SstReadOptions { - read_batch_row_num: table_options.num_rows_per_row_group, reverse: false, + num_rows_per_row_group: table_options.num_rows_per_row_group, frequency: ReadFrequency::Once, projected_schema: projected_schema.clone(), predicate: Arc::new(Predicate::empty()), meta_cache: self.meta_cache.clone(), + scan_options, runtime: runtime.clone(), - background_read_parallelism: 1, - num_rows_per_row_group: table_options.num_rows_per_row_group, }; - let iter_options = IterOptions::default(); + let iter_options = IterOptions { + batch_size: table_options.num_rows_per_row_group, + }; let merge_iter = { let space_id = table_data.space_id; let table_id = table_data.id; @@ -881,12 +884,13 @@ impl SpaceStore { }; let record_batch_stream = if table_options.need_dedup() { - row_iter::record_batch_with_key_iter_to_stream( - DedupIterator::new(request_id, merge_iter, iter_options), - &runtime, - ) + row_iter::record_batch_with_key_iter_to_stream(DedupIterator::new( + request_id, + merge_iter, + iter_options, + )) } else { - row_iter::record_batch_with_key_iter_to_stream(merge_iter, &runtime) + row_iter::record_batch_with_key_iter_to_stream(merge_iter) }; let sst_meta = { diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index d82550b0a2..0b3465fc78 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -33,10 +33,9 @@ use wal::manager::{WalLocation, WalManagerRef}; use crate::{ compaction::scheduler::CompactionSchedulerRef, manifest::ManifestRef, - row_iter::IterOptions, space::{SpaceId, SpaceRef}, sst::{ - factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef}, + factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions}, file::FilePurger, meta_data::cache::MetaCacheRef, }, @@ -173,7 +172,7 @@ pub struct Instance { /// Write sst max buffer size pub(crate) write_sst_max_buffer_size: usize, /// Options for scanning sst - pub(crate) iter_options: IterOptions, + pub(crate) scan_options: ScanOptions, pub(crate) remote_engine: Option, } diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 2f8a03a8bf..37f0db513f 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -35,10 +35,9 @@ use crate::{ }, manifest::{meta_data::TableManifestData, LoadRequest, ManifestRef}, payload::{ReadPayload, WalDecoder}, - row_iter::IterOptions, space::{Space, SpaceContext, SpaceId, SpaceRef}, sst::{ - factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef}, + factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions}, file::FilePurger, }, table::data::{TableData, TableDataRef}, @@ -74,9 +73,9 @@ impl Instance { let file_purger = FilePurger::start(&bg_runtime, store_picker.default_store().clone()); - let iter_options = IterOptions { - batch_size: ctx.config.scan_batch_size, - sst_background_read_parallelism: ctx.config.sst_background_read_parallelism, + let scan_options = ScanOptions { + background_read_parallelism: ctx.config.sst_background_read_parallelism, + max_record_batches_in_flight: ctx.config.scan_max_record_batches_in_flight, }; let instance = Arc::new(Instance { @@ -94,7 +93,7 @@ impl Instance { space_write_buffer_size: ctx.config.space_write_buffer_size, replay_batch_size: ctx.config.replay_batch_size, write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_bytes() as usize, - iter_options, + scan_options, remote_engine: remote_engine_ref, }); diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index a7f04f0589..24600774d3 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -94,8 +94,6 @@ impl Instance { ); let table_data = space_table.table_data(); - - let iter_options = self.iter_options.clone(); let table_options = table_data.table_options(); // Collect metrics. @@ -108,12 +106,12 @@ impl Instance { if need_merge_sort { let merge_iters = self - .build_merge_iters(table_data, &request, iter_options, &table_options) + .build_merge_iters(table_data, &request, &table_options) .await?; self.build_partitioned_streams(&request, merge_iters) } else { let chain_iters = self - .build_chain_iters(table_data, &request, iter_options, &table_options) + .build_chain_iters(table_data, &request, &table_options) .await?; self.build_partitioned_streams(&request, chain_iters) } @@ -155,27 +153,28 @@ impl Instance { &self, table_data: &TableData, request: &ReadRequest, - iter_options: IterOptions, table_options: &TableOptions, ) -> Result>> { // Current visible sequence let sequence = table_data.last_sequence(); let projected_schema = request.projected_schema.clone(); let sst_read_options = SstReadOptions { - read_batch_row_num: table_options.num_rows_per_row_group, reverse: request.order.is_in_desc_order(), frequency: ReadFrequency::Frequent, projected_schema: projected_schema.clone(), predicate: request.predicate.clone(), meta_cache: self.meta_cache.clone(), runtime: self.read_runtime().clone(), - background_read_parallelism: iter_options.sst_background_read_parallelism, num_rows_per_row_group: table_options.num_rows_per_row_group, + scan_options: self.scan_options.clone(), }; let time_range = request.predicate.time_range(); let version = table_data.current_version(); let read_views = self.partition_ssts_and_memtables(time_range, version, table_options); + let iter_options = IterOptions { + batch_size: table_options.num_rows_per_row_group, + }; let mut iters = Vec::with_capacity(read_views.len()); for (idx, read_view) in read_views.into_iter().enumerate() { @@ -226,7 +225,6 @@ impl Instance { &self, table_data: &TableData, request: &ReadRequest, - iter_options: IterOptions, table_options: &TableOptions, ) -> Result> { let projected_schema = request.projected_schema.clone(); @@ -234,7 +232,6 @@ impl Instance { assert!(request.order.is_out_of_order()); let sst_read_options = SstReadOptions { - read_batch_row_num: table_options.num_rows_per_row_group, // no need to read in order so just read in asc order by default. reverse: false, frequency: ReadFrequency::Frequent, @@ -242,8 +239,8 @@ impl Instance { predicate: request.predicate.clone(), meta_cache: self.meta_cache.clone(), runtime: self.read_runtime().clone(), - background_read_parallelism: iter_options.sst_background_read_parallelism, num_rows_per_row_group: table_options.num_rows_per_row_group, + scan_options: self.scan_options.clone(), }; let time_range = request.predicate.time_range(); diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 2d41cde485..8902ba5ce2 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -71,8 +71,10 @@ pub struct Config { /// End of global write buffer options. // Iterator scanning options - /// Batch size for iterator + /// Batch size for iterator. pub scan_batch_size: usize, + /// Max record batches in flight when scan + pub scan_max_record_batches_in_flight: usize, /// Sst background reading parallelism pub sst_background_read_parallelism: usize, /// Max buffer size for writing sst @@ -108,8 +110,10 @@ impl Default for Config { /// Zero means disabling this param, give a positive value to enable /// it. db_write_buffer_size: 0, + #[allow(deprecated)] scan_batch_size: 500, sst_background_read_parallelism: 8, + scan_max_record_batches_in_flight: 1024, write_sst_max_buffer_size: ReadableSize::mb(10), wal: WalStorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), diff --git a/analytic_engine/src/row_iter/dedup.rs b/analytic_engine/src/row_iter/dedup.rs index bb3d616037..241efca465 100644 --- a/analytic_engine/src/row_iter/dedup.rs +++ b/analytic_engine/src/row_iter/dedup.rs @@ -229,7 +229,8 @@ mod tests { ], ); - let mut iter = DedupIterator::new(RequestId::next_id(), iter, IterOptions::default()); + let mut iter = + DedupIterator::new(RequestId::next_id(), iter, IterOptions { batch_size: 500 }); check_iterator( &mut iter, vec![ diff --git a/analytic_engine/src/row_iter/merge.rs b/analytic_engine/src/row_iter/merge.rs index 5a4f6e7bb7..b9dd964af6 100644 --- a/analytic_engine/src/row_iter/merge.rs +++ b/analytic_engine/src/row_iter/merge.rs @@ -910,7 +910,7 @@ mod tests { schema.to_record_schema_with_key(), streams, Vec::new(), - IterOptions::default(), + IterOptions { batch_size: 500 }, false, Metrics::new(1, 1, None), ); @@ -963,7 +963,7 @@ mod tests { schema.to_record_schema_with_key(), streams, Vec::new(), - IterOptions::default(), + IterOptions { batch_size: 500 }, true, Metrics::new(1, 1, None), ); diff --git a/analytic_engine/src/row_iter/mod.rs b/analytic_engine/src/row_iter/mod.rs index 48b4010cd5..4f287e3e04 100644 --- a/analytic_engine/src/row_iter/mod.rs +++ b/analytic_engine/src/row_iter/mod.rs @@ -2,19 +2,11 @@ //! Iterators for row. -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - use async_trait::async_trait; use common_types::{record_batch::RecordBatchWithKey, schema::RecordSchemaWithKey}; -use common_util::{error::BoxError, runtime::Runtime}; -use futures::stream::Stream; -use log::{debug, error}; -use tokio::sync::mpsc::{self, Receiver}; +use common_util::error::BoxError; -use crate::sst::writer::{RecordBatchStream, RecordBatchStreamItem}; +use crate::sst::writer::RecordBatchStream; pub mod chain; pub mod dedup; @@ -23,27 +15,9 @@ pub mod record_batch_stream; #[cfg(test)] pub mod tests; -const RECORD_BATCH_READ_BUF_SIZE: usize = 10; - #[derive(Debug, Clone)] pub struct IterOptions { pub batch_size: usize, - pub sst_background_read_parallelism: usize, -} - -impl IterOptions { - pub fn new(batch_size: usize, sst_background_read_parallelism: usize) -> Self { - Self { - batch_size, - sst_background_read_parallelism, - } - } -} - -impl Default for IterOptions { - fn default() -> Self { - Self::new(500, 1) - } } /// The iterator for reading RecordBatch from a table. @@ -59,39 +33,12 @@ pub trait RecordBatchWithKeyIterator: Send { async fn next_batch(&mut self) -> std::result::Result, Self::Error>; } -struct ReceiverStream { - rx: Receiver, -} - -impl Stream for ReceiverStream { - type Item = RecordBatchStreamItem; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - Pin::new(&mut this.rx).poll_recv(cx) - } -} - -// TODO(yingwen): This is a hack way to convert an async trait to stream. -pub fn record_batch_with_key_iter_to_stream( - mut iter: I, - runtime: &Runtime, +pub fn record_batch_with_key_iter_to_stream( + iter: I, ) -> RecordBatchStream { - let (tx, rx) = mpsc::channel(RECORD_BATCH_READ_BUF_SIZE); - runtime.spawn(async move { - while let Some(record_batch) = iter.next_batch().await.transpose() { - let record_batch = record_batch.box_err(); - - debug!( - "compact table send next record batch, batch:{:?}", - record_batch - ); - if tx.send(record_batch).await.is_err() { - error!("Failed to send record batch from the merge iterator"); - break; - } - } + let record_batch_stream = futures::stream::unfold(iter, |mut iter| async { + let item = iter.next_batch().await.box_err().transpose(); + item.map(|item| (item, iter)) }); - - Box::new(ReceiverStream { rx }) + Box::new(Box::pin(record_batch_stream)) } diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index 7872e4cf69..891876331d 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -94,19 +94,32 @@ pub struct SstReadHint { pub file_format: Option, } +#[derive(Debug, Clone)] +pub struct ScanOptions { + /// The suggested parallelism while reading sst + pub background_read_parallelism: usize, + /// The max record batches in flight + pub max_record_batches_in_flight: usize, +} + +impl Default for ScanOptions { + fn default() -> Self { + Self { + background_read_parallelism: 1, + max_record_batches_in_flight: 64, + } + } +} + #[derive(Debug, Clone)] pub struct SstReadOptions { - pub read_batch_row_num: usize, pub reverse: bool, pub frequency: ReadFrequency, + pub num_rows_per_row_group: usize, pub projected_schema: ProjectedSchema, pub predicate: PredicateRef, pub meta_cache: Option, - - /// The max number of rows in one row group - pub num_rows_per_row_group: usize, - /// The suggested parallelism while reading sst - pub background_read_parallelism: usize, + pub scan_options: ScanOptions, pub runtime: Arc, } @@ -152,7 +165,8 @@ impl Factory for FactoryImpl { let reader = ThreadedReader::new( reader, options.runtime.clone(), - options.background_read_parallelism, + options.scan_options.background_read_parallelism, + options.scan_options.max_record_batches_in_flight, ); Ok(Box::new(reader)) } diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 3a22cb5504..d113e2e7c3 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -23,7 +23,7 @@ use common_util::{ time::InstantExt, }; use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt}; -use log::{debug, error, info, warn}; +use log::{debug, error, info}; use object_store::{ObjectStoreRef, Path}; use parquet::{ arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, ProjectionMask}, @@ -61,19 +61,17 @@ pub struct Reader<'a> { store: &'a ObjectStoreRef, /// The hint for the sst file size. file_size_hint: Option, + num_rows_per_row_group: usize, projected_schema: ProjectedSchema, meta_cache: Option, predicate: PredicateRef, /// Current frequency decides the cache policy. frequency: ReadFrequency, - batch_size: usize, - /// Init those fields in `init_if_necessary` meta_data: Option, row_projector: Option, /// Options for `read_parallelly` - parallelism_options: ParallelismOptions, metrics: Metrics, } @@ -97,9 +95,6 @@ impl<'a> Reader<'a> { store_picker: &'a ObjectStorePickerRef, metrics_collector: Option, ) -> Self { - let batch_size = options.read_batch_row_num; - let parallelism_options = - ParallelismOptions::new(options.read_batch_row_num, options.num_rows_per_row_group); let store = store_picker.pick_by_freq(options.frequency); let metrics = Metrics { @@ -111,14 +106,13 @@ impl<'a> Reader<'a> { path, store, file_size_hint, + num_rows_per_row_group: options.num_rows_per_row_group, projected_schema: options.projected_schema.clone(), meta_cache: options.meta_cache.clone(), predicate: options.predicate.clone(), frequency: options.frequency, - batch_size, meta_data: None, row_projector: None, - parallelism_options, metrics, } } @@ -128,11 +122,6 @@ impl<'a> Reader<'a> { read_parallelism: usize, ) -> Result> + Send + Unpin>>> { assert!(read_parallelism > 0); - let read_parallelism = if self.parallelism_options.enable_read_parallelly { - read_parallelism - } else { - 1 - }; self.init_if_necessary().await?; let streams = self.fetch_record_batch_streams(read_parallelism).await?; @@ -140,8 +129,10 @@ impl<'a> Reader<'a> { return Ok(Vec::new()); } - let row_projector = self.row_projector.take().unwrap(); - let row_projector = ArrowRecordBatchProjector::from(row_projector); + let row_projector = { + let row_projector = self.row_projector.take().unwrap(); + ArrowRecordBatchProjector::from(row_projector) + }; let sst_meta_data = self .meta_data @@ -187,9 +178,15 @@ impl<'a> Reader<'a> { Ok(pruner.prune()) } + /// The final parallelism is ensured in the range: [1, num_row_groups]. + #[inline] + fn decide_read_parallelism(suggested: usize, num_row_groups: usize) -> usize { + suggested.min(num_row_groups).max(1) + } + async fn fetch_record_batch_streams( &mut self, - read_parallelism: usize, + suggested_parallelism: usize, ) -> Result> { assert!(self.meta_data.is_some()); @@ -215,18 +212,18 @@ impl<'a> Reader<'a> { } // Partition the batches by `read_parallelism`. - let suggest_read_parallelism = read_parallelism; - let read_parallelism = std::cmp::min(target_row_groups.len(), suggest_read_parallelism); + let parallelism = + Self::decide_read_parallelism(suggested_parallelism, target_row_groups.len()); // TODO: we only support read parallelly when `batch_size` == // `num_rows_per_row_group`, so this placing method is ok, we should // adjust it when supporting it other situations. - let chunks_num = read_parallelism; - let chunk_size = target_row_groups.len() / read_parallelism + 1; - self.metrics.parallelism = read_parallelism; + let chunks_num = parallelism; + let chunk_size = target_row_groups.len() / parallelism; + self.metrics.parallelism = parallelism; info!( "Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}", - suggest_read_parallelism, read_parallelism, chunk_size + suggested_parallelism, parallelism, chunk_size ); let mut target_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num]; @@ -248,7 +245,7 @@ impl<'a> Reader<'a> { .await .with_context(|| ParquetError)?; let stream = builder - .with_batch_size(self.batch_size) + .with_batch_size(self.num_rows_per_row_group) .with_row_groups(chunk) .with_projection(proj_mask.clone()) .build() @@ -361,40 +358,6 @@ impl<'a> Reader<'a> { } } -/// Options for `read_parallelly` in [Reader] -#[derive(Debug, Clone, Copy)] -struct ParallelismOptions { - /// Whether allow parallelly reading. - /// - /// NOTICE: now we only allow `read_parallelly` when - /// `read_batch_row_num` == `num_rows_per_row_group` - /// (surely, `num_rows_per_row_group` > 0). - // TODO: maybe we should support `read_parallelly` in all situations. - enable_read_parallelly: bool, - // TODO: more configs will be add. -} - -impl ParallelismOptions { - fn new(read_batch_row_num: usize, num_rows_per_row_group: usize) -> Self { - let enable_read_parallelly = if read_batch_row_num != num_rows_per_row_group { - warn!( - "Reader new parallelism options not enable, don't allow read parallelly because - read_batch_row_num != num_rows_per_row_group, - read_batch_row_num:{}, num_rows_per_row_group:{}", - read_batch_row_num, num_rows_per_row_group - ); - - false - } else { - true - }; - - Self { - enable_read_parallelly, - } - } -} - #[derive(Clone, Debug)] struct ObjectReaderMetrics { bytes_scanned: usize, @@ -639,8 +602,6 @@ impl Stream for RecordBatchReceiver { } } -const DEFAULT_CHANNEL_CAP: usize = 1024; - /// Spawn a new thread to read record_batches pub struct ThreadedReader<'a> { inner: Reader<'a>, @@ -651,7 +612,12 @@ pub struct ThreadedReader<'a> { } impl<'a> ThreadedReader<'a> { - pub fn new(reader: Reader<'a>, runtime: Arc, read_parallelism: usize) -> Self { + pub fn new( + reader: Reader<'a>, + runtime: Arc, + read_parallelism: usize, + channel_cap: usize, + ) -> Self { assert!( read_parallelism > 0, "read parallelism must be greater than 0" @@ -660,7 +626,7 @@ impl<'a> ThreadedReader<'a> { Self { inner: reader, runtime, - channel_cap: DEFAULT_CHANNEL_CAP, + channel_cap, read_parallelism, } } @@ -708,7 +674,7 @@ impl<'a> SstReader for ThreadedReader<'a> { self.read_parallelism, read_parallelism ); - let channel_cap_per_sub_reader = self.channel_cap / self.read_parallelism + 1; + let channel_cap_per_sub_reader = self.channel_cap / sub_readers.len(); let (tx_group, rx_group): (Vec<_>, Vec<_>) = (0..read_parallelism) .map(|_| mpsc::channel::>(channel_cap_per_sub_reader)) .unzip(); @@ -738,8 +704,6 @@ mod tests { use futures::{Stream, StreamExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; - use super::ParallelismOptions; - struct MockReceivers { rx_group: Vec>, cur_rx_idx: usize, @@ -835,25 +799,4 @@ mod tests { assert_eq!(actual, expected); } - - #[test] - fn test_parallelism_options() { - // `read_batch_row_num` < num_rows_per_row_group` - let read_batch_row_num = 2; - let num_rows_per_row_group = 4; - let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); - assert!(!options.enable_read_parallelly); - - // `read_batch_row_num` > num_rows_per_row_group - let read_batch_row_num = 8; - let num_rows_per_row_group = 4; - let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); - assert!(!options.enable_read_parallelly); - - // `read_batch_row_num` == num_rows_per_row_group` - let read_batch_row_num = 4; - let num_rows_per_row_group = 4; - let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group); - assert!(options.enable_read_parallelly); - } } diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index ce2359be00..4e3d8169ce 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -333,7 +333,9 @@ mod tests { use crate::{ row_iter::tests::build_record_batch_with_key, sst::{ - factory::{Factory, FactoryImpl, ReadFrequency, SstReadOptions, SstWriteOptions}, + factory::{ + Factory, FactoryImpl, ReadFrequency, ScanOptions, SstReadOptions, SstWriteOptions, + }, parquet::AsyncParquetReader, reader::{tests::check_stream, SstReader}, }, @@ -411,17 +413,17 @@ mod tests { assert_eq!(15, sst_info.row_num); + let scan_options = ScanOptions::default(); // read sst back to test let sst_read_options = SstReadOptions { - read_batch_row_num: 5, reverse: false, frequency: ReadFrequency::Frequent, + num_rows_per_row_group: 5, projected_schema, predicate: Arc::new(Predicate::empty()), meta_cache: None, + scan_options, runtime: runtime.clone(), - num_rows_per_row_group: 5, - background_read_parallelism: 1, }; let mut reader: Box = { diff --git a/benchmarks/src/config.rs b/benchmarks/src/config.rs index 8091d17b33..5d65a3b1cd 100644 --- a/benchmarks/src/config.rs +++ b/benchmarks/src/config.rs @@ -50,7 +50,7 @@ pub struct SstBenchConfig { /// Max number of projection columns. pub max_projections: usize, - pub read_batch_row_num: usize, + pub num_rows_per_row_group: usize, pub predicate: BenchPredicate, pub sst_meta_cache_cap: Option, pub sst_data_cache_cap: Option, @@ -70,7 +70,7 @@ pub struct MergeSstBenchConfig { /// Max number of projection columns. pub max_projections: usize, - pub read_batch_row_num: usize, + pub num_rows_per_row_group: usize, pub predicate: BenchPredicate, } diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index 47b378a08b..4974877d1e 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -18,7 +18,7 @@ use analytic_engine::{ sst::{ factory::{ FactoryImpl, FactoryRef as SstFactoryRef, ObjectStorePickerRef, ReadFrequency, - SstReadOptions, + ScanOptions, SstReadOptions, }, meta_data::cache::MetaCacheRef, }, @@ -129,14 +129,15 @@ impl MergeMemTableBench { self.dedup = dedup; } - // TODO(xikai): add benchmark for merge in reverse order. pub fn run_bench(&self) { let space_id = self.space_id; let table_id = self.table_id; let sequence = u64::MAX; - let iter_options = IterOptions::default(); let projected_schema = self.projected_schema.clone(); let sst_factory: SstFactoryRef = Arc::new(FactoryImpl::default()); + let iter_options = IterOptions { + batch_size: self.sst_read_options.num_rows_per_row_group, + }; let request_id = RequestId::next_id(); let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); @@ -195,15 +196,18 @@ fn mock_sst_read_options( projected_schema: ProjectedSchema, runtime: Arc, ) -> SstReadOptions { + let scan_options = ScanOptions { + background_read_parallelism: 1, + max_record_batches_in_flight: 1024, + }; SstReadOptions { - read_batch_row_num: 500, reverse: false, frequency: ReadFrequency::Frequent, + num_rows_per_row_group: 500, projected_schema, predicate: Arc::new(Predicate::empty()), meta_cache: None, + scan_options, runtime, - background_read_parallelism: 1, - num_rows_per_row_group: 500, } } diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index 8f65879b9b..334288c911 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -16,7 +16,7 @@ use analytic_engine::{ sst::{ factory::{ FactoryImpl, FactoryRef as SstFactoryRef, ObjectStorePickerRef, ReadFrequency, - SstReadOptions, + ScanOptions, SstReadOptions, }, file::{FileHandle, FilePurgeQueue, Request}, meta_data::cache::MetaCacheRef, @@ -61,16 +61,19 @@ impl MergeSstBench { let predicate = config.predicate.into_predicate(); let projected_schema = ProjectedSchema::no_projection(schema.clone()); + let scan_options = ScanOptions { + background_read_parallelism: 1, + max_record_batches_in_flight: 1024, + }; let sst_read_options = SstReadOptions { - read_batch_row_num: config.read_batch_row_num, reverse: false, frequency: ReadFrequency::Frequent, + num_rows_per_row_group: config.num_rows_per_row_group, projected_schema, predicate, meta_cache: meta_cache.clone(), + scan_options, runtime: runtime.clone(), - background_read_parallelism: 1, - num_rows_per_row_group: config.read_batch_row_num, }; let max_projections = cmp::min(config.max_projections, schema.num_columns()); @@ -117,9 +120,11 @@ impl MergeSstBench { let space_id = self.space_id; let table_id = self.table_id; let sequence = u64::MAX; - let iter_options = IterOptions::default(); let projected_schema = self.sst_read_options.projected_schema.clone(); let sst_factory: SstFactoryRef = Arc::new(FactoryImpl::default()); + let iter_options = IterOptions { + batch_size: self.sst_read_options.num_rows_per_row_group, + }; let request_id = RequestId::next_id(); let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); diff --git a/benchmarks/src/parquet_bench.rs b/benchmarks/src/parquet_bench.rs index edbf5d82ae..a39b659838 100644 --- a/benchmarks/src/parquet_bench.rs +++ b/benchmarks/src/parquet_bench.rs @@ -48,7 +48,7 @@ impl ParquetBench { _predicate: config.predicate.into_predicate(), runtime: Arc::new(runtime), is_async: config.is_async, - batch_size: config.read_batch_row_num, + batch_size: config.num_rows_per_row_group, } } diff --git a/benchmarks/src/sst_bench.rs b/benchmarks/src/sst_bench.rs index 9cddf5a602..c57af0721b 100644 --- a/benchmarks/src/sst_bench.rs +++ b/benchmarks/src/sst_bench.rs @@ -6,7 +6,8 @@ use std::{cmp, sync::Arc, time::Instant}; use analytic_engine::sst::{ factory::{ - Factory, FactoryImpl, ObjectStorePickerRef, ReadFrequency, SstReadHint, SstReadOptions, + Factory, FactoryImpl, ObjectStorePickerRef, ReadFrequency, ScanOptions, SstReadHint, + SstReadOptions, }, meta_data::cache::{MetaCache, MetaCacheRef}, }; @@ -39,16 +40,19 @@ impl SstBench { let schema = runtime.block_on(util::schema_from_sst(&store, &sst_path, &meta_cache)); let predicate = config.predicate.into_predicate(); let projected_schema = ProjectedSchema::no_projection(schema.clone()); + let scan_options = ScanOptions { + background_read_parallelism: 1, + max_record_batches_in_flight: 1024, + }; let sst_read_options = SstReadOptions { - read_batch_row_num: config.read_batch_row_num, reverse: config.reverse, frequency: ReadFrequency::Frequent, + num_rows_per_row_group: config.num_rows_per_row_group, projected_schema, predicate, meta_cache, + scan_options, runtime: runtime.clone(), - background_read_parallelism: 1, - num_rows_per_row_group: config.read_batch_row_num, }; let max_projections = cmp::min(config.max_projections, schema.num_columns()); diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 13a86bab38..f02c9c79a6 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -15,7 +15,7 @@ use analytic_engine::{ sst::{ factory::{ Factory, FactoryImpl, FactoryRef as SstFactoryRef, ObjectStorePickerRef, ReadFrequency, - SstReadHint, SstReadOptions, SstWriteOptions, + ScanOptions, SstReadHint, SstReadOptions, SstWriteOptions, }, file::FilePurgeQueue, manager::FileId, @@ -78,7 +78,6 @@ async fn create_sst_from_stream(config: SstConfig, record_batch_stream: RecordBa pub struct RebuildSstConfig { store_path: String, input_file_name: String, - read_batch_row_num: usize, predicate: BenchPredicate, // Output sst config: @@ -96,16 +95,19 @@ pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc) { let sst_meta = util::meta_from_sst(&store, &input_path, &None).await; let projected_schema = ProjectedSchema::no_projection(sst_meta.schema.clone()); + let scan_options = ScanOptions { + background_read_parallelism: 1, + max_record_batches_in_flight: 1024, + }; let sst_read_options = SstReadOptions { - read_batch_row_num: config.read_batch_row_num, reverse: false, frequency: ReadFrequency::Once, + num_rows_per_row_group: config.num_rows_per_row_group, projected_schema, predicate: config.predicate.into_predicate(), meta_cache: None, + scan_options, runtime, - background_read_parallelism: 1, - num_rows_per_row_group: config.read_batch_row_num, }; let record_batch_stream = @@ -154,7 +156,6 @@ pub struct MergeSstConfig { table_id: TableId, sst_file_ids: Vec, dedup: bool, - read_batch_row_num: usize, predicate: BenchPredicate, // Output sst config: @@ -196,8 +197,11 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let first_sst_path = sst_util::new_sst_file_path(space_id, table_id, config.sst_file_ids[0]); let schema = util::schema_from_sst(&store, &first_sst_path, &None).await; let iter_options = IterOptions { - batch_size: config.read_batch_row_num, - sst_background_read_parallelism: 1, + batch_size: config.num_rows_per_row_group, + }; + let scan_options = ScanOptions { + background_read_parallelism: 1, + max_record_batches_in_flight: 1024, }; let request_id = RequestId::next_id(); @@ -205,15 +209,14 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let store_picker: ObjectStorePickerRef = Arc::new(store); let projected_schema = ProjectedSchema::no_projection(schema.clone()); let sst_read_options = SstReadOptions { - read_batch_row_num: config.read_batch_row_num, reverse: false, frequency: ReadFrequency::Once, + num_rows_per_row_group: config.num_rows_per_row_group, projected_schema: projected_schema.clone(), predicate: config.predicate.into_predicate(), meta_cache: None, + scan_options, runtime: runtime.clone(), - background_read_parallelism: iter_options.sst_background_read_parallelism, - num_rows_per_row_group: config.read_batch_row_num, }; let iter = { let space_id = config.space_id; @@ -245,9 +248,9 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let record_batch_stream = if config.dedup { let iter = DedupIterator::new(request_id, iter, iter_options); - row_iter::record_batch_with_key_iter_to_stream(iter, &runtime) + row_iter::record_batch_with_key_iter_to_stream(iter) } else { - row_iter::record_batch_with_key_iter_to_stream(iter, &runtime) + row_iter::record_batch_with_key_iter_to_stream(iter) }; let sst_meta = { diff --git a/benchmarks/src/util.rs b/benchmarks/src/util.rs index 7e0875b9ff..9d3d44c50c 100644 --- a/benchmarks/src/util.rs +++ b/benchmarks/src/util.rs @@ -9,7 +9,8 @@ use analytic_engine::{ space::SpaceId, sst::{ factory::{ - Factory, FactoryImpl, ObjectStorePickerRef, ReadFrequency, SstReadHint, SstReadOptions, + Factory, FactoryImpl, ObjectStorePickerRef, ReadFrequency, ScanOptions, SstReadHint, + SstReadOptions, }, file::{FileHandle, FileMeta, FilePurgeQueue}, manager::FileId, @@ -100,16 +101,19 @@ pub async fn load_sst_to_memtable( memtable: &MemTableRef, runtime: Arc, ) { + let scan_options = ScanOptions { + background_read_parallelism: 1, + max_record_batches_in_flight: 1024, + }; let sst_read_options = SstReadOptions { - read_batch_row_num: 500, reverse: false, frequency: ReadFrequency::Frequent, + num_rows_per_row_group: 8192, projected_schema: ProjectedSchema::no_projection(schema.clone()), predicate: Arc::new(Predicate::empty()), meta_cache: None, + scan_options, runtime, - background_read_parallelism: 1, - num_rows_per_row_group: 500, }; let sst_factory = FactoryImpl; let store_picker: ObjectStorePickerRef = Arc::new(store.clone()); diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs index 730116d029..1ef8b0ba2b 100644 --- a/tools/src/bin/sst-convert.rs +++ b/tools/src/bin/sst-convert.rs @@ -6,8 +6,8 @@ use std::{error::Error, sync::Arc}; use analytic_engine::{ sst::factory::{ - Factory, FactoryImpl, ObjectStorePickerRef, ReadFrequency, SstReadHint, SstReadOptions, - SstWriteOptions, + Factory, FactoryImpl, ObjectStorePickerRef, ReadFrequency, ScanOptions, SstReadHint, + SstReadOptions, SstWriteOptions, }, table_options::{Compression, StorageFormatHint}, }; @@ -74,16 +74,16 @@ async fn run(args: Args, runtime: Arc) -> Result<()> { let input_path = Path::from(args.input); let sst_meta = sst_util::meta_from_sst(&store, &input_path).await; let factory = FactoryImpl; + let scan_options = ScanOptions::default(); let reader_opts = SstReadOptions { - read_batch_row_num: 8192, reverse: false, frequency: ReadFrequency::Once, + num_rows_per_row_group: 8192, projected_schema: ProjectedSchema::no_projection(sst_meta.schema.clone()), predicate: Arc::new(Predicate::empty()), meta_cache: None, + scan_options, runtime, - background_read_parallelism: 1, - num_rows_per_row_group: 8192, }; let store_picker: ObjectStorePickerRef = Arc::new(store); let mut reader = factory From 5f03a1b770a80ea22953757b581da5b48572c3bb Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Thu, 23 Mar 2023 18:02:18 +0800 Subject: [PATCH 2/6] chore: make scan_batch_size works only for iterators --- analytic_engine/src/instance/mod.rs | 2 ++ analytic_engine/src/instance/open.rs | 6 ++++++ analytic_engine/src/instance/read.rs | 10 +++++++--- analytic_engine/src/lib.rs | 8 +++++--- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 0b3465fc78..f0d59c44cd 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -33,6 +33,7 @@ use wal::manager::{WalLocation, WalManagerRef}; use crate::{ compaction::scheduler::CompactionSchedulerRef, manifest::ManifestRef, + row_iter::IterOptions, space::{SpaceId, SpaceRef}, sst::{ factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions}, @@ -173,6 +174,7 @@ pub struct Instance { pub(crate) write_sst_max_buffer_size: usize, /// Options for scanning sst pub(crate) scan_options: ScanOptions, + pub(crate) iter_options: Option, pub(crate) remote_engine: Option, } diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 37f0db513f..15928f087d 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -35,6 +35,7 @@ use crate::{ }, manifest::{meta_data::TableManifestData, LoadRequest, ManifestRef}, payload::{ReadPayload, WalDecoder}, + row_iter::IterOptions, space::{Space, SpaceContext, SpaceId, SpaceRef}, sst::{ factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions}, @@ -78,6 +79,10 @@ impl Instance { max_record_batches_in_flight: ctx.config.scan_max_record_batches_in_flight, }; + let iter_options = ctx + .config + .scan_batch_size + .map(|batch_size| IterOptions { batch_size }); let instance = Arc::new(Instance { space_store, runtimes: ctx.runtimes.clone(), @@ -93,6 +98,7 @@ impl Instance { space_write_buffer_size: ctx.config.space_write_buffer_size, replay_batch_size: ctx.config.replay_batch_size, write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_bytes() as usize, + iter_options, scan_options, remote_engine: remote_engine_ref, }); diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index 24600774d3..a44850c34a 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -172,9 +172,7 @@ impl Instance { let time_range = request.predicate.time_range(); let version = table_data.current_version(); let read_views = self.partition_ssts_and_memtables(time_range, version, table_options); - let iter_options = IterOptions { - batch_size: table_options.num_rows_per_row_group, - }; + let iter_options = self.make_iter_options(table_options.num_rows_per_row_group); let mut iters = Vec::with_capacity(read_views.len()); for (idx, read_view) in read_views.into_iter().enumerate() { @@ -328,6 +326,12 @@ impl Instance { read_view_by_time.into_values().collect() } + + fn make_iter_options(&self, num_rows_per_row_group: usize) -> IterOptions { + self.iter_options.clone().unwrap_or(IterOptions { + batch_size: num_rows_per_row_group, + }) + } } // TODO(xikai): this is a hack way to implement SendableRecordBatchStream for diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 8902ba5ce2..11e8d5c3b2 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -72,7 +72,10 @@ pub struct Config { // Iterator scanning options /// Batch size for iterator. - pub scan_batch_size: usize, + /// + /// The `num_rows_per_row_group` in `table options` will be used if this is + /// not set. + pub scan_batch_size: Option, /// Max record batches in flight when scan pub scan_max_record_batches_in_flight: usize, /// Sst background reading parallelism @@ -110,8 +113,7 @@ impl Default for Config { /// Zero means disabling this param, give a positive value to enable /// it. db_write_buffer_size: 0, - #[allow(deprecated)] - scan_batch_size: 500, + scan_batch_size: None, sst_background_read_parallelism: 8, scan_max_record_batches_in_flight: 1024, write_sst_max_buffer_size: ReadableSize::mb(10), From d43f28f55568f6d6e6d8d462d6d53a363926d240 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Thu, 23 Mar 2023 18:20:05 +0800 Subject: [PATCH 3/6] chore: use separate scan options for compaction --- analytic_engine/src/compaction/scheduler.rs | 6 +++++- analytic_engine/src/instance/flush_compaction.rs | 7 ++++--- analytic_engine/src/instance/open.rs | 7 +++++++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 5e929e0b9f..14b4d0d5bd 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -40,7 +40,7 @@ use crate::{ instance::{ flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore, }, - sst::factory::SstWriteOptions, + sst::factory::{ScanOptions, SstWriteOptions}, table::data::TableDataRef, TableOptions, }; @@ -289,6 +289,7 @@ impl SchedulerImpl { runtime: Arc, config: SchedulerConfig, write_sst_max_buffer_size: usize, + scan_options: ScanOptions, ) -> Self { let (tx, rx) = mpsc::channel(config.schedule_channel_len); let running = Arc::new(AtomicBool::new(true)); @@ -303,6 +304,7 @@ impl SchedulerImpl { max_ongoing_tasks: config.max_ongoing_tasks, max_unflushed_duration: config.max_unflushed_duration.0, write_sst_max_buffer_size, + scan_options, limit: Arc::new(OngoingTaskLimit { ongoing_tasks: AtomicUsize::new(0), request_buf: RwLock::new(RequestQueue::default()), @@ -371,6 +373,7 @@ struct ScheduleWorker { picker_manager: PickerManager, max_ongoing_tasks: usize, write_sst_max_buffer_size: usize, + scan_options: ScanOptions, limit: Arc, running: Arc, memory_limit: MemoryLimit, @@ -485,6 +488,7 @@ impl ScheduleWorker { &table_data, request_id, &compaction_task, + &self.scan_options, &sst_write_options, ) .await; diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 3950cee888..aa44ffa389 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -730,6 +730,7 @@ impl SpaceStore { table_data: &TableData, request_id: RequestId, task: &CompactionTask, + scan_options: &ScanOptions, sst_write_options: &SstWriteOptions, ) -> Result<()> { debug!( @@ -768,6 +769,7 @@ impl SpaceStore { table_data, request_id, input, + scan_options, sst_write_options, &mut edit_meta, ) @@ -799,6 +801,7 @@ impl SpaceStore { table_data: &TableData, request_id: RequestId, input: &CompactionInputFiles, + scan_options: &ScanOptions, sst_write_options: &SstWriteOptions, edit_meta: &mut VersionEditMeta, ) -> Result<()> { @@ -838,8 +841,6 @@ impl SpaceStore { let schema = table_data.schema(); let table_options = table_data.table_options(); let projected_schema = ProjectedSchema::no_projection(schema.clone()); - // TODO: make the scan_options configurable for compaction. - let scan_options = ScanOptions::default(); let sst_read_options = SstReadOptions { reverse: false, num_rows_per_row_group: table_options.num_rows_per_row_group, @@ -847,7 +848,7 @@ impl SpaceStore { projected_schema: projected_schema.clone(), predicate: Arc::new(Predicate::empty()), meta_cache: self.meta_cache.clone(), - scan_options, + scan_options: scan_options.clone(), runtime: runtime.clone(), }; let iter_options = IterOptions { diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 15928f087d..ae93668454 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -44,6 +44,8 @@ use crate::{ table::data::{TableData, TableDataRef}, }; +const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64; + impl Instance { /// Open a new instance pub async fn open( @@ -65,11 +67,16 @@ impl Instance { let scheduler_config = ctx.config.compaction_config.clone(); let bg_runtime = ctx.runtimes.bg_runtime.clone(); + let scan_options_for_compaction = ScanOptions { + background_read_parallelism: 1, + max_record_batches_in_flight: MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ, + }; let compaction_scheduler = Arc::new(SchedulerImpl::new( space_store.clone(), bg_runtime.clone(), scheduler_config, ctx.config.write_sst_max_buffer_size.as_bytes() as usize, + scan_options_for_compaction, )); let file_purger = FilePurger::start(&bg_runtime, store_picker.default_store().clone()); From da03cb321a097771cc7f9626d9588fdb4a64a400 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Thu, 23 Mar 2023 18:26:18 +0800 Subject: [PATCH 4/6] chore: fix clippy warnings --- analytic_engine/src/compaction/scheduler.rs | 7 ++++--- .../src/instance/flush_compaction.rs | 21 ++++++++++--------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 14b4d0d5bd..68ad1ecf2d 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -476,6 +476,7 @@ impl ScheduleWorker { compression: table_data.table_options().compression, max_buffer_size: self.write_sst_max_buffer_size, }; + let scan_options = self.scan_options.clone(); // Do actual costly compact job in background. self.runtime.spawn(async move { @@ -484,12 +485,12 @@ impl ScheduleWorker { let res = space_store .compact_table( - runtime, - &table_data, request_id, + &table_data, &compaction_task, - &self.scan_options, + scan_options, &sst_write_options, + runtime, ) .await; diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index aa44ffa389..9085af01c4 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -726,12 +726,12 @@ impl Instance { impl SpaceStore { pub(crate) async fn compact_table( &self, - runtime: Arc, - table_data: &TableData, request_id: RequestId, + table_data: &TableData, task: &CompactionTask, - scan_options: &ScanOptions, + scan_options: ScanOptions, sst_write_options: &SstWriteOptions, + runtime: Arc, ) -> Result<()> { debug!( "Begin compact table, table_name:{}, id:{}, task:{:?}", @@ -765,12 +765,12 @@ impl SpaceStore { for input in &task.compaction_inputs { self.compact_input_files( - runtime.clone(), - table_data, request_id, + table_data, input, - scan_options, + scan_options.clone(), sst_write_options, + runtime.clone(), &mut edit_meta, ) .await?; @@ -795,14 +795,15 @@ impl SpaceStore { Ok(()) } + #[allow(clippy::too_many_arguments)] pub(crate) async fn compact_input_files( &self, - runtime: Arc, - table_data: &TableData, request_id: RequestId, + table_data: &TableData, input: &CompactionInputFiles, - scan_options: &ScanOptions, + scan_options: ScanOptions, sst_write_options: &SstWriteOptions, + runtime: Arc, edit_meta: &mut VersionEditMeta, ) -> Result<()> { debug!( @@ -848,7 +849,7 @@ impl SpaceStore { projected_schema: projected_schema.clone(), predicate: Arc::new(Predicate::empty()), meta_cache: self.meta_cache.clone(), - scan_options: scan_options.clone(), + scan_options, runtime: runtime.clone(), }; let iter_options = IterOptions { From d46717c968276cab50c083e8ce5ab0dee6752d98 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Thu, 23 Mar 2023 21:15:46 +0800 Subject: [PATCH 5/6] chore: refactor implement stream for batch of streams --- analytic_engine/src/instance/read.rs | 139 ++++++++++++++++----------- 1 file changed, 85 insertions(+), 54 deletions(-) diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index a44850c34a..6e1b4d9cd3 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -9,12 +9,14 @@ use std::{ }; use common_types::{ - projected_schema::ProjectedSchema, record_batch::RecordBatch, schema::RecordSchema, + projected_schema::ProjectedSchema, + record_batch::{RecordBatch, RecordBatchWithKey}, + schema::RecordSchema, time::TimeRange, }; -use common_util::{define_result, error::BoxError, runtime::Runtime}; +use common_util::{define_result, error::BoxError}; use futures::stream::Stream; -use log::{debug, error, trace}; +use log::debug; use snafu::{ResultExt, Snafu}; use table_engine::{ stream::{ @@ -22,7 +24,6 @@ use table_engine::{ }, table::ReadRequest, }; -use tokio::sync::mpsc::{self, Receiver}; use trace_metric::Metric; use crate::{ @@ -66,7 +67,6 @@ pub enum Error { define_result!(Error); -const RECORD_BATCH_READ_BUF_SIZE: usize = 1000; const MERGE_SORT_METRIC_NAME: &str = "do_merge_sort"; const ITER_NUM_METRIC_NAME: &str = "iter_num"; const MERGE_ITER_METRICS_COLLECTOR_NAME_PREFIX: &str = "merge_iter"; @@ -140,7 +140,7 @@ impl Instance { let mut streams = Vec::with_capacity(read_parallelism); for iters in splitted_iters { - let stream = iters_to_stream(iters, self.read_runtime(), &request.projected_schema); + let stream = iters_to_stream(iters, request.projected_schema.clone()); streams.push(stream); } @@ -334,70 +334,101 @@ impl Instance { } } -// TODO(xikai): this is a hack way to implement SendableRecordBatchStream for -// MergeIterator. -fn iters_to_stream( - collection: T, - runtime: &Runtime, - schema: &ProjectedSchema, -) -> SendableRecordBatchStream -where - T: IntoIterator + Send + 'static, - T::Item: RecordBatchWithKeyIterator, - T::IntoIter: Send, -{ - let (tx, rx) = mpsc::channel(RECORD_BATCH_READ_BUF_SIZE); - let projected_schema = schema.clone(); - - runtime.spawn(async move { - for mut iter in collection { - while let Some(record_batch) = iter.next_batch().await.transpose() { - let record_batch = record_batch.box_err().context(ErrWithSource { - msg: "read record batch", - }); - - // Apply the projection to RecordBatchWithKey and gets the final RecordBatch. - let record_batch = record_batch.and_then(|batch_with_key| { - // TODO(yingwen): Try to use projector to do this, which precompute row - // indexes to project. - batch_with_key - .try_project(&projected_schema) - .box_err() - .context(ErrWithSource { - msg: "project record batch", - }) - }); - - trace!("send next record batch:{:?}", record_batch); - if tx.send(record_batch).await.is_err() { - error!("Failed to send record batch from the merge iterator"); - break; - } +struct StreamStateOnMultiIters { + iters: Vec, + curr_iter_idx: usize, + projected_schema: ProjectedSchema, +} + +impl StreamStateOnMultiIters { + fn is_exhausted(&self) -> bool { + self.curr_iter_idx >= self.iters.len() + } + + fn advance(&mut self) { + self.curr_iter_idx += 1; + } + + fn curr_iter_mut(&mut self) -> &mut I { + &mut self.iters[self.curr_iter_idx] + } + + async fn fetch_next_batch( + &mut self, + ) -> Option> { + loop { + if self.is_exhausted() { + return None; + } + + let iter = self.curr_iter_mut(); + if let Some(v) = iter.next_batch().await.transpose() { + return Some(v); } + + self.advance(); } + } +} + +fn iters_to_stream( + iters: Vec, + projected_schema: ProjectedSchema, +) -> SendableRecordBatchStream { + let state = StreamStateOnMultiIters { + projected_schema: projected_schema.clone(), + iters, + curr_iter_idx: 0, + }; + + let record_batch_stream = futures::stream::unfold(state, |mut state| async move { + let projected_schema = state.projected_schema.clone(); + state + .fetch_next_batch() + .await + .map(|record_batch| { + record_batch + .box_err() + .context(ErrWithSource { + msg: "read record batch", + }) + .and_then(|batch_with_key| { + // TODO(yingwen): Try to use projector to do this, which pre-compute row + // indexes to project. + batch_with_key + .try_project(&projected_schema) + .box_err() + .context(ErrWithSource { + msg: "project record batch", + }) + }) + }) + .map(|record_batch| (record_batch, state)) }); - Box::pin(ChannelledRecordBatchStream { - schema: schema.to_record_schema(), - rx, - }) + let record_schema = projected_schema.to_record_schema(); + let stream_with_schema = RecordBatchStreamWithSchema { + schema: record_schema, + inner_stream: Box::pin(Box::pin(record_batch_stream)), + }; + Box::pin(stream_with_schema) } -pub struct ChannelledRecordBatchStream { +pub struct RecordBatchStreamWithSchema { schema: RecordSchema, - rx: Receiver>, + inner_stream: Pin> + Send + Unpin>>, } -impl Stream for ChannelledRecordBatchStream { +impl Stream for RecordBatchStreamWithSchema { type Item = stream::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - Pin::new(&mut this.rx).poll_recv(cx) + this.inner_stream.as_mut().poll_next(cx) } } -impl RecordBatchStream for ChannelledRecordBatchStream { +impl RecordBatchStream for RecordBatchStreamWithSchema { fn schema(&self) -> &RecordSchema { &self.schema } From 6585f8cd061837bded3fd465cb169de11cda40cc Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Fri, 24 Mar 2023 11:31:39 +0800 Subject: [PATCH 6/6] chore: stop poll after input stream is exhausted in sst writer --- analytic_engine/src/sst/parquet/writer.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 4e3d8169ce..898542daaf 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -66,6 +66,7 @@ struct RecordBatchGroupWriter { request_id: RequestId, hybrid_encoding: bool, input: RecordBatchStream, + input_exhausted: bool, meta_data: MetaData, num_rows_per_row_group: usize, max_buffer_size: usize, @@ -110,6 +111,10 @@ impl RecordBatchGroupWriter { continue; } + if self.input_exhausted { + break; + } + // Previous record batch has been exhausted, and let's fetch next record batch. match self.input.next().await { Some(v) => { @@ -124,7 +129,10 @@ impl RecordBatchGroupWriter { // fill `curr_row_group`. prev_record_batch.replace(v); } - None => break, + None => { + self.input_exhausted = true; + break; + } }; } @@ -271,6 +279,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { hybrid_encoding: self.hybrid_encoding, request_id, input, + input_exhausted: false, num_rows_per_row_group: self.num_rows_per_row_group, max_buffer_size: self.max_buffer_size, compression: self.compression, @@ -521,6 +530,7 @@ mod tests { request_id: RequestId::next_id(), hybrid_encoding: false, input: record_batch_stream, + input_exhausted: false, num_rows_per_row_group, compression: Compression::UNCOMPRESSED, meta_data: MetaData {