Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable record batches in flight #759

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
instance::{
flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore,
},
sst::factory::SstWriteOptions,
sst::factory::{ScanOptions, SstWriteOptions},
table::data::TableDataRef,
TableOptions,
};
Expand Down Expand Up @@ -289,6 +289,7 @@ impl SchedulerImpl {
runtime: Arc<Runtime>,
config: SchedulerConfig,
write_sst_max_buffer_size: usize,
scan_options: ScanOptions,
) -> Self {
let (tx, rx) = mpsc::channel(config.schedule_channel_len);
let running = Arc::new(AtomicBool::new(true));
Expand All @@ -303,6 +304,7 @@ impl SchedulerImpl {
max_ongoing_tasks: config.max_ongoing_tasks,
max_unflushed_duration: config.max_unflushed_duration.0,
write_sst_max_buffer_size,
scan_options,
limit: Arc::new(OngoingTaskLimit {
ongoing_tasks: AtomicUsize::new(0),
request_buf: RwLock::new(RequestQueue::default()),
Expand Down Expand Up @@ -371,6 +373,7 @@ struct ScheduleWorker {
picker_manager: PickerManager,
max_ongoing_tasks: usize,
write_sst_max_buffer_size: usize,
scan_options: ScanOptions,
limit: Arc<OngoingTaskLimit>,
running: Arc<AtomicBool>,
memory_limit: MemoryLimit,
Expand Down Expand Up @@ -473,6 +476,7 @@ impl ScheduleWorker {
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};
let scan_options = self.scan_options.clone();

// Do actual costly compact job in background.
self.runtime.spawn(async move {
Expand All @@ -481,11 +485,12 @@ impl ScheduleWorker {

let res = space_store
.compact_table(
runtime,
&table_data,
request_id,
&table_data,
&compaction_task,
scan_options,
&sst_write_options,
runtime,
)
.await;

Expand Down
38 changes: 22 additions & 16 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{
},
space::SpaceAndTable,
sst::{
factory::{self, ReadFrequency, SstReadOptions, SstWriteOptions},
factory::{self, ReadFrequency, ScanOptions, SstReadOptions, SstWriteOptions},
file::FileMeta,
meta_data::SstMetaReader,
writer::{MetaData, RecordBatchStream},
Expand Down Expand Up @@ -726,11 +726,12 @@ impl Instance {
impl SpaceStore {
pub(crate) async fn compact_table(
&self,
runtime: Arc<Runtime>,
table_data: &TableData,
request_id: RequestId,
table_data: &TableData,
task: &CompactionTask,
scan_options: ScanOptions,
sst_write_options: &SstWriteOptions,
runtime: Arc<Runtime>,
) -> Result<()> {
debug!(
"Begin compact table, table_name:{}, id:{}, task:{:?}",
Expand Down Expand Up @@ -764,11 +765,12 @@ impl SpaceStore {

for input in &task.compaction_inputs {
self.compact_input_files(
runtime.clone(),
table_data,
request_id,
table_data,
input,
scan_options.clone(),
sst_write_options,
runtime.clone(),
&mut edit_meta,
)
.await?;
Expand All @@ -793,13 +795,15 @@ impl SpaceStore {
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub(crate) async fn compact_input_files(
&self,
runtime: Arc<Runtime>,
table_data: &TableData,
request_id: RequestId,
table_data: &TableData,
input: &CompactionInputFiles,
scan_options: ScanOptions,
sst_write_options: &SstWriteOptions,
runtime: Arc<Runtime>,
edit_meta: &mut VersionEditMeta,
) -> Result<()> {
debug!(
Expand Down Expand Up @@ -839,17 +843,18 @@ impl SpaceStore {
let table_options = table_data.table_options();
let projected_schema = ProjectedSchema::no_projection(schema.clone());
let sst_read_options = SstReadOptions {
read_batch_row_num: table_options.num_rows_per_row_group,
reverse: false,
num_rows_per_row_group: table_options.num_rows_per_row_group,
frequency: ReadFrequency::Once,
projected_schema: projected_schema.clone(),
predicate: Arc::new(Predicate::empty()),
meta_cache: self.meta_cache.clone(),
scan_options,
runtime: runtime.clone(),
background_read_parallelism: 1,
num_rows_per_row_group: table_options.num_rows_per_row_group,
};
let iter_options = IterOptions::default();
let iter_options = IterOptions {
batch_size: table_options.num_rows_per_row_group,
};
let merge_iter = {
let space_id = table_data.space_id;
let table_id = table_data.id;
Expand Down Expand Up @@ -881,12 +886,13 @@ impl SpaceStore {
};

let record_batch_stream = if table_options.need_dedup() {
row_iter::record_batch_with_key_iter_to_stream(
DedupIterator::new(request_id, merge_iter, iter_options),
&runtime,
)
row_iter::record_batch_with_key_iter_to_stream(DedupIterator::new(
request_id,
merge_iter,
iter_options,
))
} else {
row_iter::record_batch_with_key_iter_to_stream(merge_iter, &runtime)
row_iter::record_batch_with_key_iter_to_stream(merge_iter)
};

let sst_meta = {
Expand Down
5 changes: 3 additions & 2 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
row_iter::IterOptions,
space::{SpaceId, SpaceRef},
sst::{
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef},
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions},
file::FilePurger,
meta_data::cache::MetaCacheRef,
},
Expand Down Expand Up @@ -173,7 +173,8 @@ pub struct Instance {
/// Write sst max buffer size
pub(crate) write_sst_max_buffer_size: usize,
/// Options for scanning sst
pub(crate) iter_options: IterOptions,
pub(crate) scan_options: ScanOptions,
pub(crate) iter_options: Option<IterOptions>,
pub(crate) remote_engine: Option<RemoteEngineRef>,
}

Expand Down
20 changes: 16 additions & 4 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ use crate::{
row_iter::IterOptions,
space::{Space, SpaceContext, SpaceId, SpaceRef},
sst::{
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef},
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions},
file::FilePurger,
},
table::data::{TableData, TableDataRef},
};

const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64;

impl Instance {
/// Open a new instance
pub async fn open(
Expand All @@ -65,20 +67,29 @@ impl Instance {

let scheduler_config = ctx.config.compaction_config.clone();
let bg_runtime = ctx.runtimes.bg_runtime.clone();
let scan_options_for_compaction = ScanOptions {
background_read_parallelism: 1,
max_record_batches_in_flight: MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ,
};
let compaction_scheduler = Arc::new(SchedulerImpl::new(
space_store.clone(),
bg_runtime.clone(),
scheduler_config,
ctx.config.write_sst_max_buffer_size.as_bytes() as usize,
scan_options_for_compaction,
));

let file_purger = FilePurger::start(&bg_runtime, store_picker.default_store().clone());

let iter_options = IterOptions {
batch_size: ctx.config.scan_batch_size,
sst_background_read_parallelism: ctx.config.sst_background_read_parallelism,
let scan_options = ScanOptions {
background_read_parallelism: ctx.config.sst_background_read_parallelism,
max_record_batches_in_flight: ctx.config.scan_max_record_batches_in_flight,
};

let iter_options = ctx
.config
.scan_batch_size
.map(|batch_size| IterOptions { batch_size });
let instance = Arc::new(Instance {
space_store,
runtimes: ctx.runtimes.clone(),
Expand All @@ -95,6 +106,7 @@ impl Instance {
replay_batch_size: ctx.config.replay_batch_size,
write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_bytes() as usize,
iter_options,
scan_options,
remote_engine: remote_engine_ref,
});

Expand Down
Loading