Skip to content

Commit

Permalink
fix: add page index for metadata (apache#1000)
Browse files Browse the repository at this point in the history
## Rationale
Now there is no page index in the `meta_data`, we should build page
index if we want to use row selection.

## Detailed Changes
* build page index for `meta_data`
* add some debug log

## Test Plan

---------

Co-authored-by: Jiacai Liu <dev@liujiacai.net>
Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 20, 2023
1 parent 85eb0b7 commit 41fe63a
Show file tree
Hide file tree
Showing 25 changed files with 441 additions and 610 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 36 additions & 72 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,26 +318,13 @@ pub struct ExpiredFiles {

#[derive(Default, Clone)]
pub struct CompactionTask {
inputs: Vec<CompactionInputFiles>,
expired: Vec<ExpiredFiles>,
}

impl Drop for CompactionTask {
fn drop(&mut self) {
// When a CompactionTask is dropped, it means
// 1. the task finished successfully, or
// 2. the task is cancelled for some reason, like memory limit
//
// In case 2, we need to mark files as not compacted in order for them to be
// scheduled again. In case 1, the files will be moved out of level controller,
// so it doesn't care what the flag is, so it's safe to set false here.
self.mark_files_being_compacted(false);
}
pub compaction_inputs: Vec<CompactionInputFiles>,
pub expired: Vec<ExpiredFiles>,
}

impl CompactionTask {
fn mark_files_being_compacted(&self, being_compacted: bool) {
for input in &self.inputs {
pub fn mark_files_being_compacted(&self, being_compacted: bool) {
for input in &self.compaction_inputs {
for file in &input.files {
file.set_being_compacted(being_compacted);
}
Expand All @@ -350,76 +337,29 @@ impl CompactionTask {
}

// Estimate the size of the total input files.
#[inline]
pub fn estimated_total_input_file_size(&self) -> usize {
let total_input_size: u64 = self
.inputs
.compaction_inputs
.iter()
.map(|v| v.files.iter().map(|f| f.size()).sum::<u64>())
.sum();

total_input_size as usize
}

#[inline]
pub fn num_compact_files(&self) -> usize {
self.inputs.iter().map(|v| v.files.len()).sum()
self.compaction_inputs.iter().map(|v| v.files.len()).sum()
}

#[inline]
pub fn is_empty(&self) -> bool {
self.is_input_empty() && self.expired.is_empty()
}

#[inline]
pub fn is_input_empty(&self) -> bool {
self.inputs.is_empty()
}

#[inline]
pub fn expired(&self) -> &[ExpiredFiles] {
&self.expired
}

#[inline]
pub fn inputs(&self) -> &[CompactionInputFiles] {
&self.inputs
}
}

pub struct CompactionTaskBuilder {
expired: Vec<ExpiredFiles>,
inputs: Vec<CompactionInputFiles>,
}

impl CompactionTaskBuilder {
pub fn with_expired(expired: Vec<ExpiredFiles>) -> Self {
Self {
expired,
inputs: Vec::new(),
}
}

pub fn add_inputs(&mut self, files: CompactionInputFiles) {
self.inputs.push(files);
}

pub fn build(self) -> CompactionTask {
let task = CompactionTask {
expired: self.expired,
inputs: self.inputs,
};

task.mark_files_being_compacted(true);

task
pub fn num_expired_files(&self) -> usize {
self.expired.iter().map(|v| v.files.len()).sum()
}
}

impl fmt::Debug for CompactionTask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactionTask")
.field("inputs", &self.inputs)
.field("inputs", &self.compaction_inputs)
.field(
"expired",
&self
Expand All @@ -440,12 +380,36 @@ impl fmt::Debug for CompactionTask {
}
}

#[derive(Default)]
pub struct PickerManager;
pub struct PickerManager {
default_picker: CompactionPickerRef,
time_window_picker: CompactionPickerRef,
size_tiered_picker: CompactionPickerRef,
}

impl Default for PickerManager {
fn default() -> Self {
let size_tiered_picker = Arc::new(CommonCompactionPicker::new(
CompactionStrategy::SizeTiered(SizeTieredCompactionOptions::default()),
));
let time_window_picker = Arc::new(CommonCompactionPicker::new(
CompactionStrategy::TimeWindow(TimeWindowCompactionOptions::default()),
));

Self {
default_picker: time_window_picker.clone(),
size_tiered_picker,
time_window_picker,
}
}
}

impl PickerManager {
pub fn get_picker(&self, strategy: CompactionStrategy) -> CompactionPickerRef {
Arc::new(CommonCompactionPicker::new(strategy))
match strategy {
CompactionStrategy::Default => self.default_picker.clone(),
CompactionStrategy::SizeTiered(_) => self.size_tiered_picker.clone(),
CompactionStrategy::TimeWindow(_) => self.time_window_picker.clone(),
}
}
}

Expand Down
64 changes: 33 additions & 31 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use snafu::Snafu;

use crate::{
compaction::{
CompactionInputFiles, CompactionStrategy, CompactionTask, CompactionTaskBuilder,
SizeTieredCompactionOptions, TimeWindowCompactionOptions,
CompactionInputFiles, CompactionStrategy, CompactionTask, SizeTieredCompactionOptions,
TimeWindowCompactionOptions,
},
sst::{
file::{FileHandle, Level},
Expand Down Expand Up @@ -60,7 +60,7 @@ pub trait CompactionPicker {
fn pick_compaction(
&self,
ctx: PickerContext,
levels_controller: &mut LevelsController,
levels_controller: &LevelsController,
) -> Result<CompactionTask>;
}

Expand All @@ -86,10 +86,10 @@ pub struct CommonCompactionPicker {
impl CommonCompactionPicker {
pub fn new(strategy: CompactionStrategy) -> Self {
let level_picker: LevelPickerRef = match strategy {
CompactionStrategy::SizeTiered(_) => Arc::new(SizeTieredPicker::default()),
CompactionStrategy::TimeWindow(_) | CompactionStrategy::Default => {
Arc::new(TimeWindowPicker::default())
CompactionStrategy::SizeTiered(_) | CompactionStrategy::Default => {
Arc::new(SizeTieredPicker::default())
}
CompactionStrategy::TimeWindow(_) => Arc::new(TimeWindowPicker::default()),
};
Self { level_picker }
}
Expand Down Expand Up @@ -123,11 +123,13 @@ impl CompactionPicker for CommonCompactionPicker {
fn pick_compaction(
&self,
ctx: PickerContext,
levels_controller: &mut LevelsController,
levels_controller: &LevelsController,
) -> Result<CompactionTask> {
let expire_time = ctx.ttl.map(Timestamp::expire_time);
let mut builder =
CompactionTaskBuilder::with_expired(levels_controller.expired_ssts(expire_time));
let mut compaction_task = CompactionTask {
expired: levels_controller.expired_ssts(expire_time),
..Default::default()
};

if let Some(input_files) =
self.pick_compact_candidates(&ctx, levels_controller, expire_time)
Expand All @@ -137,10 +139,10 @@ impl CompactionPicker for CommonCompactionPicker {
ctx.strategy, input_files
);

builder.add_inputs(input_files);
compaction_task.compaction_inputs = vec![input_files];
}

Ok(builder.build())
Ok(compaction_task)
}
}

Expand Down Expand Up @@ -732,39 +734,39 @@ mod tests {
};
let now = Timestamp::now();
{
let mut lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
assert_eq!(task.inputs[0].files.len(), 2);
assert_eq!(task.inputs[0].files[0].id(), 0);
assert_eq!(task.inputs[0].files[1].id(), 1);
let lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
assert_eq!(task.compaction_inputs[0].files.len(), 2);
assert_eq!(task.compaction_inputs[0].files[0].id(), 0);
assert_eq!(task.compaction_inputs[0].files[1].id(), 1);
assert_eq!(task.expired[0].files.len(), 1);
assert_eq!(task.expired[0].files[0].id(), 3);
}

{
let mut lc = build_newest_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
assert_eq!(task.inputs[0].files.len(), 4);
assert_eq!(task.inputs[0].files[0].id(), 2);
assert_eq!(task.inputs[0].files[1].id(), 3);
assert_eq!(task.inputs[0].files[2].id(), 4);
assert_eq!(task.inputs[0].files[3].id(), 5);
let lc = build_newest_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
assert_eq!(task.compaction_inputs[0].files.len(), 4);
assert_eq!(task.compaction_inputs[0].files[0].id(), 2);
assert_eq!(task.compaction_inputs[0].files[1].id(), 3);
assert_eq!(task.compaction_inputs[0].files[2].id(), 4);
assert_eq!(task.compaction_inputs[0].files[3].id(), 5);
}

{
let mut lc = build_newest_bucket_no_match_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
assert_eq!(task.inputs.len(), 0);
let lc = build_newest_bucket_no_match_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
assert_eq!(task.compaction_inputs.len(), 0);
}

// If ttl is None, then no file is expired.
ctx.ttl = None;
{
let mut lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx, &mut lc).unwrap();
assert_eq!(task.inputs[0].files.len(), 2);
assert_eq!(task.inputs[0].files[0].id(), 0);
assert_eq!(task.inputs[0].files[1].id(), 1);
let lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx, &lc).unwrap();
assert_eq!(task.compaction_inputs[0].files.len(), 2);
assert_eq!(task.compaction_inputs[0].files[0].id(), 0);
assert_eq!(task.compaction_inputs[0].files[1].id(), 1);
assert!(task.expired[0].files.is_empty());
}
}
Expand Down
21 changes: 9 additions & 12 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl OngoingTaskLimit {

if dropped > 0 {
warn!(
"Too many compaction pending tasks, limit:{}, dropped:{}.",
"Too many compaction pending tasks, limit: {}, dropped {} older tasks.",
self.max_pending_compaction_tasks, dropped,
);
}
Expand Down Expand Up @@ -462,7 +462,10 @@ impl ScheduleWorker {
waiter_notifier: WaiterNotifier,
token: MemoryUsageToken,
) {
let keep_scheduling_compaction = !compaction_task.is_input_empty();
// Mark files being in compaction.
compaction_task.mark_files_being_compacted(true);

let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty();

let runtime = self.runtime.clone();
let space_store = self.space_store.clone();
Expand Down Expand Up @@ -500,6 +503,9 @@ impl ScheduleWorker {
.await;

if let Err(e) = &res {
// Compaction is failed, we need to unset the compaction mark.
compaction_task.mark_files_being_compacted(false);

error!(
"Failed to compact table, table_name:{}, table_id:{}, request_id:{}, err:{}",
table_data.name, table_data.id, request_id, e
Expand Down Expand Up @@ -650,16 +656,7 @@ impl ScheduleWorker {
self.max_unflushed_duration,
);

let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await {
v
} else {
warn!(
"Table is closed, ignore this periodical flush, table:{}",
table_data.name
);
continue;
};

let mut serial_exec = table_data.serial_exec.lock().await;
let flush_scheduler = serial_exec.flush_scheduler();
// Instance flush the table asynchronously.
if let Err(e) = flusher
Expand Down
16 changes: 6 additions & 10 deletions analytic_engine/src/instance/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//! Close table logic of instance

use log::{info, warn};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use table_engine::engine::CloseTableRequest;

use crate::{
instance::{
engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result},
engine::{DoManifestSnapshot, FlushTable, Result},
flush_compaction::{Flusher, TableFlushOptions},
},
manifest::{ManifestRef, SnapshotRequest},
Expand Down Expand Up @@ -37,11 +37,8 @@ impl Closer {

// Flush table.
let opts = TableFlushOptions::default();
let mut serial_exec_ctx = table_data
.acquire_serial_exec_ctx()
.await
.context(OperateClosedTable)?;
let flush_scheduler = serial_exec_ctx.flush_scheduler();
let mut serial_exec = table_data.serial_exec.lock().await;
let flush_scheduler = serial_exec.flush_scheduler();

self.flusher
.do_flush(flush_scheduler, &table_data, opts)
Expand Down Expand Up @@ -70,10 +67,9 @@ impl Closer {
let removed_table = self.space.remove_table(&request.table_name);
assert!(removed_table.is_some());

serial_exec_ctx.invalidate();
info!(
"table:{} has been removed from the space_id:{}, table_id:{}",
table_data.name, self.space.id, table_data.id,
"table:{}-{} has been removed from the space_id:{}",
table_data.name, table_data.id, self.space.id
);
Ok(())
}
Expand Down
Loading

0 comments on commit 41fe63a

Please sign in to comment.