Skip to content

Commit

Permalink
feat: configurable record batches in flight (apache#759)
Browse files Browse the repository at this point in the history
* feat: make max_record_batches_in_flight configurable

* chore: make scan_batch_size works only for iterators

* chore: use separate scan options for compaction

* chore: fix clippy warnings

* chore: refactor implement stream for batch of streams

* chore: stop poll after input stream is exhausted in sst writer
  • Loading branch information
ShiKaiWi authored Mar 24, 2023
1 parent bcaf189 commit c54d931
Show file tree
Hide file tree
Showing 20 changed files with 293 additions and 294 deletions.
11 changes: 8 additions & 3 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
instance::{
flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore,
},
sst::factory::SstWriteOptions,
sst::factory::{ScanOptions, SstWriteOptions},
table::data::TableDataRef,
TableOptions,
};
Expand Down Expand Up @@ -289,6 +289,7 @@ impl SchedulerImpl {
runtime: Arc<Runtime>,
config: SchedulerConfig,
write_sst_max_buffer_size: usize,
scan_options: ScanOptions,
) -> Self {
let (tx, rx) = mpsc::channel(config.schedule_channel_len);
let running = Arc::new(AtomicBool::new(true));
Expand All @@ -303,6 +304,7 @@ impl SchedulerImpl {
max_ongoing_tasks: config.max_ongoing_tasks,
max_unflushed_duration: config.max_unflushed_duration.0,
write_sst_max_buffer_size,
scan_options,
limit: Arc::new(OngoingTaskLimit {
ongoing_tasks: AtomicUsize::new(0),
request_buf: RwLock::new(RequestQueue::default()),
Expand Down Expand Up @@ -371,6 +373,7 @@ struct ScheduleWorker {
picker_manager: PickerManager,
max_ongoing_tasks: usize,
write_sst_max_buffer_size: usize,
scan_options: ScanOptions,
limit: Arc<OngoingTaskLimit>,
running: Arc<AtomicBool>,
memory_limit: MemoryLimit,
Expand Down Expand Up @@ -473,6 +476,7 @@ impl ScheduleWorker {
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};
let scan_options = self.scan_options.clone();

// Do actual costly compact job in background.
self.runtime.spawn(async move {
Expand All @@ -481,11 +485,12 @@ impl ScheduleWorker {

let res = space_store
.compact_table(
runtime,
&table_data,
request_id,
&table_data,
&compaction_task,
scan_options,
&sst_write_options,
runtime,
)
.await;

Expand Down
38 changes: 22 additions & 16 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{
},
space::SpaceAndTable,
sst::{
factory::{self, ReadFrequency, SstReadOptions, SstWriteOptions},
factory::{self, ReadFrequency, ScanOptions, SstReadOptions, SstWriteOptions},
file::FileMeta,
meta_data::SstMetaReader,
writer::{MetaData, RecordBatchStream},
Expand Down Expand Up @@ -726,11 +726,12 @@ impl Instance {
impl SpaceStore {
pub(crate) async fn compact_table(
&self,
runtime: Arc<Runtime>,
table_data: &TableData,
request_id: RequestId,
table_data: &TableData,
task: &CompactionTask,
scan_options: ScanOptions,
sst_write_options: &SstWriteOptions,
runtime: Arc<Runtime>,
) -> Result<()> {
debug!(
"Begin compact table, table_name:{}, id:{}, task:{:?}",
Expand Down Expand Up @@ -764,11 +765,12 @@ impl SpaceStore {

for input in &task.compaction_inputs {
self.compact_input_files(
runtime.clone(),
table_data,
request_id,
table_data,
input,
scan_options.clone(),
sst_write_options,
runtime.clone(),
&mut edit_meta,
)
.await?;
Expand All @@ -793,13 +795,15 @@ impl SpaceStore {
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub(crate) async fn compact_input_files(
&self,
runtime: Arc<Runtime>,
table_data: &TableData,
request_id: RequestId,
table_data: &TableData,
input: &CompactionInputFiles,
scan_options: ScanOptions,
sst_write_options: &SstWriteOptions,
runtime: Arc<Runtime>,
edit_meta: &mut VersionEditMeta,
) -> Result<()> {
debug!(
Expand Down Expand Up @@ -839,17 +843,18 @@ impl SpaceStore {
let table_options = table_data.table_options();
let projected_schema = ProjectedSchema::no_projection(schema.clone());
let sst_read_options = SstReadOptions {
read_batch_row_num: table_options.num_rows_per_row_group,
reverse: false,
num_rows_per_row_group: table_options.num_rows_per_row_group,
frequency: ReadFrequency::Once,
projected_schema: projected_schema.clone(),
predicate: Arc::new(Predicate::empty()),
meta_cache: self.meta_cache.clone(),
scan_options,
runtime: runtime.clone(),
background_read_parallelism: 1,
num_rows_per_row_group: table_options.num_rows_per_row_group,
};
let iter_options = IterOptions::default();
let iter_options = IterOptions {
batch_size: table_options.num_rows_per_row_group,
};
let merge_iter = {
let space_id = table_data.space_id;
let table_id = table_data.id;
Expand Down Expand Up @@ -881,12 +886,13 @@ impl SpaceStore {
};

let record_batch_stream = if table_options.need_dedup() {
row_iter::record_batch_with_key_iter_to_stream(
DedupIterator::new(request_id, merge_iter, iter_options),
&runtime,
)
row_iter::record_batch_with_key_iter_to_stream(DedupIterator::new(
request_id,
merge_iter,
iter_options,
))
} else {
row_iter::record_batch_with_key_iter_to_stream(merge_iter, &runtime)
row_iter::record_batch_with_key_iter_to_stream(merge_iter)
};

let sst_meta = {
Expand Down
5 changes: 3 additions & 2 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
row_iter::IterOptions,
space::{SpaceId, SpaceRef},
sst::{
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef},
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions},
file::FilePurger,
meta_data::cache::MetaCacheRef,
},
Expand Down Expand Up @@ -175,7 +175,8 @@ pub struct Instance {
/// Max bytes per write batch
pub(crate) max_bytes_per_write_batch: Option<usize>,
/// Options for scanning sst
pub(crate) iter_options: IterOptions,
pub(crate) scan_options: ScanOptions,
pub(crate) iter_options: Option<IterOptions>,
pub(crate) remote_engine: Option<RemoteEngineRef>,
}

Expand Down
20 changes: 16 additions & 4 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ use crate::{
row_iter::IterOptions,
space::{Space, SpaceContext, SpaceId, SpaceRef},
sst::{
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef},
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions},
file::FilePurger,
},
table::data::{TableData, TableDataRef},
};

const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64;

impl Instance {
/// Open a new instance
pub async fn open(
Expand All @@ -65,20 +67,29 @@ impl Instance {

let scheduler_config = ctx.config.compaction_config.clone();
let bg_runtime = ctx.runtimes.bg_runtime.clone();
let scan_options_for_compaction = ScanOptions {
background_read_parallelism: 1,
max_record_batches_in_flight: MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ,
};
let compaction_scheduler = Arc::new(SchedulerImpl::new(
space_store.clone(),
bg_runtime.clone(),
scheduler_config,
ctx.config.write_sst_max_buffer_size.as_bytes() as usize,
scan_options_for_compaction,
));

let file_purger = FilePurger::start(&bg_runtime, store_picker.default_store().clone());

let iter_options = IterOptions {
batch_size: ctx.config.scan_batch_size,
sst_background_read_parallelism: ctx.config.sst_background_read_parallelism,
let scan_options = ScanOptions {
background_read_parallelism: ctx.config.sst_background_read_parallelism,
max_record_batches_in_flight: ctx.config.scan_max_record_batches_in_flight,
};

let iter_options = ctx
.config
.scan_batch_size
.map(|batch_size| IterOptions { batch_size });
let instance = Arc::new(Instance {
space_store,
runtimes: ctx.runtimes.clone(),
Expand All @@ -99,6 +110,7 @@ impl Instance {
.max_bytes_per_write_batch
.map(|v| v.as_bytes() as usize),
iter_options,
scan_options,
remote_engine: remote_engine_ref,
});

Expand Down
Loading

0 comments on commit c54d931

Please sign in to comment.