Skip to content

Commit

Permalink
feat: scan row groups in one sst file parallelly (apache#474)
Browse files Browse the repository at this point in the history
* make scanning sst file in parallel.

* rename `read_parallelism` to `background_read_parallelism` in `SstReaderOptions`.

* address CR.

* fix parallelly reading but and add tests.

* add more checks for the `read_parallism`.

* address CR.
  • Loading branch information
Rachelint authored Dec 16, 2022
1 parent 4c72a5f commit 89bee57
Show file tree
Hide file tree
Showing 19 changed files with 435 additions and 63 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,8 @@ impl SpaceStore {
predicate: Arc::new(Predicate::empty()),
meta_cache: self.meta_cache.clone(),
runtime: runtime.clone(),
background_read_parallelism: 1,
num_rows_per_row_group: table_options.num_rows_per_row_group,
};
let mut builder = MergeBuilder::new(MergeConfig {
request_id,
Expand Down
7 changes: 4 additions & 3 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use wal::manager::WalManagerRef;
use crate::{
compaction::scheduler::CompactionSchedulerRef,
meta::ManifestRef,
row_iter::IterOptions,
space::{SpaceId, SpaceRef},
sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger, meta_cache::MetaCacheRef},
table::data::TableDataRef,
Expand Down Expand Up @@ -170,10 +171,10 @@ pub struct Instance {
pub(crate) db_write_buffer_size: usize,
/// Space write buffer size
pub(crate) space_write_buffer_size: usize,
/// replay wal batch size
/// Replay wal batch size
pub(crate) replay_batch_size: usize,
/// batch size for scan sst
pub(crate) scan_batch_size: usize,
/// Options for scanning sst
pub(crate) iter_options: IterOptions,
}

impl Instance {
Expand Down
8 changes: 7 additions & 1 deletion analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
},
meta::{meta_data::TableManifestData, ManifestRef},
payload::{ReadPayload, WalDecoder},
row_iter::IterOptions,
space::{Space, SpaceId, SpaceRef},
sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger},
table::data::{TableData, TableDataRef},
Expand Down Expand Up @@ -72,6 +73,11 @@ impl Instance {
WalSynchronizer::new(WalSynchronizerConfig::default(), wal_manager);
wal_synchronizer.start(&bg_runtime).await;

let iter_options = IterOptions {
batch_size: ctx.config.scan_batch_size,
sst_background_read_parallelism: ctx.config.sst_background_read_parallelism,
};

let instance = Arc::new(Instance {
space_store,
runtimes: ctx.runtimes.clone(),
Expand All @@ -86,7 +92,7 @@ impl Instance {
db_write_buffer_size: ctx.config.db_write_buffer_size,
space_write_buffer_size: ctx.config.space_write_buffer_size,
replay_batch_size: ctx.config.replay_batch_size,
scan_batch_size: ctx.config.scan_batch_size,
iter_options,
});

Ok(instance)
Expand Down
9 changes: 7 additions & 2 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Instance {
// Collect metrics.
table_data.metrics.on_read_request_begin();

let iter_options = IterOptions::new(self.scan_batch_size);
let iter_options = self.iter_options.clone();
let table_options = table_data.table_options();

if need_merge_sort_streams(&table_data.table_options(), &request) {
Expand All @@ -104,7 +104,7 @@ impl Instance {
self.build_partitioned_streams(&request, merge_iters)
} else {
let chain_iters = self
.build_chain_iters(table_data, &request, &table_options)
.build_chain_iters(table_data, &request, iter_options, &table_options)
.await?;
self.build_partitioned_streams(&request, chain_iters)
}
Expand Down Expand Up @@ -160,6 +160,8 @@ impl Instance {
predicate: request.predicate.clone(),
meta_cache: self.meta_cache.clone(),
runtime: self.read_runtime().clone(),
background_read_parallelism: iter_options.sst_background_read_parallelism,
num_rows_per_row_group: table_options.num_rows_per_row_group,
};

let time_range = request.predicate.time_range();
Expand Down Expand Up @@ -205,6 +207,7 @@ impl Instance {
&self,
table_data: &TableData,
request: &ReadRequest,
iter_options: IterOptions,
table_options: &TableOptions,
) -> Result<Vec<ChainIterator>> {
let projected_schema = request.projected_schema.clone();
Expand All @@ -220,6 +223,8 @@ impl Instance {
predicate: request.predicate.clone(),
meta_cache: self.meta_cache.clone(),
runtime: self.read_runtime().clone(),
background_read_parallelism: iter_options.sst_background_read_parallelism,
num_rows_per_row_group: table_options.num_rows_per_row_group,
};

let time_range = request.predicate.time_range();
Expand Down
8 changes: 6 additions & 2 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ pub struct Config {
pub space_write_buffer_size: usize,
/// The maximum size of all Write Buffers across all spaces.
pub db_write_buffer_size: usize,
// End of global write buffer options.
/// End of global write buffer options.

// Batch size for scan sst
// Iterator scanning options
/// Batch size for iterator
pub scan_batch_size: usize,
/// Sst background reading parallelism
pub sst_background_read_parallelism: usize,

/// Wal storage config
///
Expand Down Expand Up @@ -108,6 +111,7 @@ impl Default for Config {
/// it.
db_write_buffer_size: 0,
scan_batch_size: 500,
sst_background_read_parallelism: 8,
wal_storage: WalStorageConfig::RocksDB,
}
}
Expand Down
10 changes: 7 additions & 3 deletions analytic_engine/src/row_iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,21 @@ const RECORD_BATCH_READ_BUF_SIZE: usize = 10;
#[derive(Debug, Clone)]
pub struct IterOptions {
pub batch_size: usize,
pub sst_background_read_parallelism: usize,
}

impl IterOptions {
pub fn new(batch_size: usize) -> Self {
Self { batch_size }
pub fn new(batch_size: usize, sst_background_read_parallelism: usize) -> Self {
Self {
batch_size,
sst_background_read_parallelism,
}
}
}

impl Default for IterOptions {
fn default() -> Self {
Self::new(500)
Self::new(500, 1)
}
}

Expand Down
12 changes: 11 additions & 1 deletion analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ pub struct SstReaderOptions {
pub predicate: PredicateRef,
pub meta_cache: Option<MetaCacheRef>,
pub runtime: Arc<Runtime>,

/// The max number of rows in one row group
pub num_rows_per_row_group: usize,

/// The suggested parallelism while reading sst
pub background_read_parallelism: usize,
}

#[derive(Debug, Clone)]
Expand All @@ -78,7 +84,11 @@ impl Factory for FactoryImpl {
Some(Box::new(ParquetSstReader::new(path, storage, options)))
} else {
let reader = AsyncParquetReader::new(path, storage, options);
let reader = ThreadedReader::new(reader, options.runtime.clone());
let reader = ThreadedReader::new(
reader,
options.runtime.clone(),
options.background_read_parallelism,
);
Some(Box::new(reader))
}
}
Expand Down
Loading

0 comments on commit 89bee57

Please sign in to comment.