Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: scan row groups in one sst file parallelly #474

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ meta_client = { workspace = true }
query_engine = { workspace = true }
server = { workspace = true }
signal-hook = "0.3"
sort = "0.8.5"
table_engine = { workspace = true }
tracing_util = { workspace = true }

Expand Down
1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ wal = { workspace = true }
common_types = { workspace = true, features = ["test"] }
common_util = { workspace = true, features = ["test"] }
env_logger = { workspace = true }
rand = { workspace = true }
wal = { workspace = true, features = ["test"] }
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,8 @@ impl SpaceStore {
predicate: Arc::new(Predicate::empty()),
meta_cache: self.meta_cache.clone(),
runtime: runtime.clone(),
background_read_parallelism: 1,
num_rows_per_row_group: table_options.num_rows_per_row_group,
};
let mut builder = MergeBuilder::new(MergeConfig {
request_id,
Expand Down
7 changes: 4 additions & 3 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use wal::manager::WalManagerRef;
use crate::{
compaction::scheduler::CompactionSchedulerRef,
meta::ManifestRef,
row_iter::IterOptions,
space::{SpaceId, SpaceRef},
sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger, meta_cache::MetaCacheRef},
table::data::TableDataRef,
Expand Down Expand Up @@ -170,10 +171,10 @@ pub struct Instance {
pub(crate) db_write_buffer_size: usize,
/// Space write buffer size
pub(crate) space_write_buffer_size: usize,
/// replay wal batch size
/// Replay wal batch size
pub(crate) replay_batch_size: usize,
/// batch size for scan sst
pub(crate) scan_batch_size: usize,
/// Options for scanning sst
pub(crate) iter_options: IterOptions,
}

impl Instance {
Expand Down
8 changes: 7 additions & 1 deletion analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
},
meta::{meta_data::TableManifestData, ManifestRef},
payload::{ReadPayload, WalDecoder},
row_iter::IterOptions,
space::{Space, SpaceId, SpaceRef},
sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger},
table::data::{TableData, TableDataRef},
Expand Down Expand Up @@ -72,6 +73,11 @@ impl Instance {
WalSynchronizer::new(WalSynchronizerConfig::default(), wal_manager);
wal_synchronizer.start(&bg_runtime).await;

let iter_options = IterOptions {
batch_size: ctx.config.scan_batch_size,
sst_background_read_parallelism: ctx.config.sst_background_read_parallelism,
};

let instance = Arc::new(Instance {
space_store,
runtimes: ctx.runtimes.clone(),
Expand All @@ -86,7 +92,7 @@ impl Instance {
db_write_buffer_size: ctx.config.db_write_buffer_size,
space_write_buffer_size: ctx.config.space_write_buffer_size,
replay_batch_size: ctx.config.replay_batch_size,
scan_batch_size: ctx.config.scan_batch_size,
iter_options,
});

Ok(instance)
Expand Down
9 changes: 7 additions & 2 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Instance {
// Collect metrics.
table_data.metrics.on_read_request_begin();

let iter_options = IterOptions::new(self.scan_batch_size);
let iter_options = self.iter_options.clone();
let table_options = table_data.table_options();

if need_merge_sort_streams(&table_data.table_options(), &request) {
Expand All @@ -104,7 +104,7 @@ impl Instance {
self.build_partitioned_streams(&request, merge_iters)
} else {
let chain_iters = self
.build_chain_iters(table_data, &request, &table_options)
.build_chain_iters(table_data, &request, iter_options, &table_options)
.await?;
self.build_partitioned_streams(&request, chain_iters)
}
Expand Down Expand Up @@ -160,6 +160,8 @@ impl Instance {
predicate: request.predicate.clone(),
meta_cache: self.meta_cache.clone(),
runtime: self.read_runtime().clone(),
background_read_parallelism: iter_options.sst_background_read_parallelism,
num_rows_per_row_group: table_options.num_rows_per_row_group,
};

let time_range = request.predicate.time_range();
Expand Down Expand Up @@ -205,6 +207,7 @@ impl Instance {
&self,
table_data: &TableData,
request: &ReadRequest,
iter_options: IterOptions,
table_options: &TableOptions,
) -> Result<Vec<ChainIterator>> {
let projected_schema = request.projected_schema.clone();
Expand All @@ -220,6 +223,8 @@ impl Instance {
predicate: request.predicate.clone(),
meta_cache: self.meta_cache.clone(),
runtime: self.read_runtime().clone(),
background_read_parallelism: iter_options.sst_background_read_parallelism,
num_rows_per_row_group: table_options.num_rows_per_row_group,
};

let time_range = request.predicate.time_range();
Expand Down
8 changes: 6 additions & 2 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ pub struct Config {
pub space_write_buffer_size: usize,
/// The maximum size of all Write Buffers across all spaces.
pub db_write_buffer_size: usize,
// End of global write buffer options.
/// End of global write buffer options.

// Batch size for scan sst
// Iterator scanning options
/// Batch size for iterator
pub scan_batch_size: usize,
/// Sst background reading parallelism
pub sst_background_read_parallelism: usize,

/// Wal storage config
///
Expand Down Expand Up @@ -108,6 +111,7 @@ impl Default for Config {
/// it.
db_write_buffer_size: 0,
scan_batch_size: 500,
sst_background_read_parallelism: 8,
wal_storage: WalStorageConfig::RocksDB,
}
}
Expand Down
10 changes: 7 additions & 3 deletions analytic_engine/src/row_iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,21 @@ const RECORD_BATCH_READ_BUF_SIZE: usize = 10;
#[derive(Debug, Clone)]
pub struct IterOptions {
pub batch_size: usize,
pub sst_background_read_parallelism: usize,
}

impl IterOptions {
pub fn new(batch_size: usize) -> Self {
Self { batch_size }
pub fn new(batch_size: usize, sst_background_read_parallelism: usize) -> Self {
Self {
batch_size,
sst_background_read_parallelism,
}
}
}

impl Default for IterOptions {
fn default() -> Self {
Self::new(500)
Self::new(500, 1)
}
}

Expand Down
12 changes: 11 additions & 1 deletion analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ pub struct SstReaderOptions {
pub predicate: PredicateRef,
pub meta_cache: Option<MetaCacheRef>,
pub runtime: Arc<Runtime>,

/// The max number of rows in one row group
pub num_rows_per_row_group: usize,

/// The suggested parallelism while reading sst
pub background_read_parallelism: usize,
}

#[derive(Debug, Clone)]
Expand All @@ -78,7 +84,11 @@ impl Factory for FactoryImpl {
Some(Box::new(ParquetSstReader::new(path, storage, options)))
} else {
let reader = AsyncParquetReader::new(path, storage, options);
let reader = ThreadedReader::new(reader, options.runtime.clone());
let reader = ThreadedReader::new(
reader,
options.runtime.clone(),
options.background_read_parallelism,
);
Some(Box::new(reader))
}
}
Expand Down
Loading