Skip to content

Commit

Permalink
Revert "fix: add page index for metadata (apache#1000)"
Browse files Browse the repository at this point in the history
This reverts commit 41fe63a.
  • Loading branch information
ShiKaiWi committed Jun 26, 2023
1 parent 03d9aa4 commit 04c38b8
Show file tree
Hide file tree
Showing 25 changed files with 613 additions and 444 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 72 additions & 36 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,26 @@ pub struct ExpiredFiles {

#[derive(Default, Clone)]
pub struct CompactionTask {
pub compaction_inputs: Vec<CompactionInputFiles>,
pub expired: Vec<ExpiredFiles>,
inputs: Vec<CompactionInputFiles>,
expired: Vec<ExpiredFiles>,
}

impl Drop for CompactionTask {
fn drop(&mut self) {
// When a CompactionTask is dropped, it means
// 1. the task finished successfully, or
// 2. the task is cancelled for some reason, like memory limit
//
// In case 2, we need to mark files as not compacted in order for them to be
// scheduled again. In case 1, the files will be moved out of level controller,
// so it doesn't care what the flag is, so it's safe to set false here.
self.mark_files_being_compacted(false);
}
}

impl CompactionTask {
pub fn mark_files_being_compacted(&self, being_compacted: bool) {
for input in &self.compaction_inputs {
fn mark_files_being_compacted(&self, being_compacted: bool) {
for input in &self.inputs {
for file in &input.files {
file.set_being_compacted(being_compacted);
}
Expand All @@ -337,29 +350,76 @@ impl CompactionTask {
}

// Estimate the size of the total input files.
#[inline]
pub fn estimated_total_input_file_size(&self) -> usize {
let total_input_size: u64 = self
.compaction_inputs
.inputs
.iter()
.map(|v| v.files.iter().map(|f| f.size()).sum::<u64>())
.sum();

total_input_size as usize
}

#[inline]
pub fn num_compact_files(&self) -> usize {
self.compaction_inputs.iter().map(|v| v.files.len()).sum()
self.inputs.iter().map(|v| v.files.len()).sum()
}

pub fn num_expired_files(&self) -> usize {
self.expired.iter().map(|v| v.files.len()).sum()
#[inline]
pub fn is_empty(&self) -> bool {
self.is_input_empty() && self.expired.is_empty()
}

#[inline]
pub fn is_input_empty(&self) -> bool {
self.inputs.is_empty()
}

#[inline]
pub fn expired(&self) -> &[ExpiredFiles] {
&self.expired
}

#[inline]
pub fn inputs(&self) -> &[CompactionInputFiles] {
&self.inputs
}
}

pub struct CompactionTaskBuilder {
expired: Vec<ExpiredFiles>,
inputs: Vec<CompactionInputFiles>,
}

impl CompactionTaskBuilder {
pub fn with_expired(expired: Vec<ExpiredFiles>) -> Self {
Self {
expired,
inputs: Vec::new(),
}
}

pub fn add_inputs(&mut self, files: CompactionInputFiles) {
self.inputs.push(files);
}

pub fn build(self) -> CompactionTask {
let task = CompactionTask {
expired: self.expired,
inputs: self.inputs,
};

task.mark_files_being_compacted(true);

task
}
}

impl fmt::Debug for CompactionTask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactionTask")
.field("inputs", &self.compaction_inputs)
.field("inputs", &self.inputs)
.field(
"expired",
&self
Expand All @@ -380,36 +440,12 @@ impl fmt::Debug for CompactionTask {
}
}

pub struct PickerManager {
default_picker: CompactionPickerRef,
time_window_picker: CompactionPickerRef,
size_tiered_picker: CompactionPickerRef,
}

impl Default for PickerManager {
fn default() -> Self {
let size_tiered_picker = Arc::new(CommonCompactionPicker::new(
CompactionStrategy::SizeTiered(SizeTieredCompactionOptions::default()),
));
let time_window_picker = Arc::new(CommonCompactionPicker::new(
CompactionStrategy::TimeWindow(TimeWindowCompactionOptions::default()),
));

Self {
default_picker: time_window_picker.clone(),
size_tiered_picker,
time_window_picker,
}
}
}
#[derive(Default)]
pub struct PickerManager;

impl PickerManager {
pub fn get_picker(&self, strategy: CompactionStrategy) -> CompactionPickerRef {
match strategy {
CompactionStrategy::Default => self.default_picker.clone(),
CompactionStrategy::SizeTiered(_) => self.size_tiered_picker.clone(),
CompactionStrategy::TimeWindow(_) => self.time_window_picker.clone(),
}
Arc::new(CommonCompactionPicker::new(strategy))
}
}

Expand Down
66 changes: 32 additions & 34 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Compaction picker.

Expand All @@ -15,8 +15,8 @@ use snafu::Snafu;

use crate::{
compaction::{
CompactionInputFiles, CompactionStrategy, CompactionTask, SizeTieredCompactionOptions,
TimeWindowCompactionOptions,
CompactionInputFiles, CompactionStrategy, CompactionTask, CompactionTaskBuilder,
SizeTieredCompactionOptions, TimeWindowCompactionOptions,
},
sst::{
file::{FileHandle, Level},
Expand Down Expand Up @@ -60,7 +60,7 @@ pub trait CompactionPicker {
fn pick_compaction(
&self,
ctx: PickerContext,
levels_controller: &LevelsController,
levels_controller: &mut LevelsController,
) -> Result<CompactionTask>;
}

Expand All @@ -86,10 +86,10 @@ pub struct CommonCompactionPicker {
impl CommonCompactionPicker {
pub fn new(strategy: CompactionStrategy) -> Self {
let level_picker: LevelPickerRef = match strategy {
CompactionStrategy::SizeTiered(_) | CompactionStrategy::Default => {
Arc::new(SizeTieredPicker::default())
CompactionStrategy::SizeTiered(_) => Arc::new(SizeTieredPicker::default()),
CompactionStrategy::TimeWindow(_) | CompactionStrategy::Default => {
Arc::new(TimeWindowPicker::default())
}
CompactionStrategy::TimeWindow(_) => Arc::new(TimeWindowPicker::default()),
};
Self { level_picker }
}
Expand Down Expand Up @@ -123,13 +123,11 @@ impl CompactionPicker for CommonCompactionPicker {
fn pick_compaction(
&self,
ctx: PickerContext,
levels_controller: &LevelsController,
levels_controller: &mut LevelsController,
) -> Result<CompactionTask> {
let expire_time = ctx.ttl.map(Timestamp::expire_time);
let mut compaction_task = CompactionTask {
expired: levels_controller.expired_ssts(expire_time),
..Default::default()
};
let mut builder =
CompactionTaskBuilder::with_expired(levels_controller.expired_ssts(expire_time));

if let Some(input_files) =
self.pick_compact_candidates(&ctx, levels_controller, expire_time)
Expand All @@ -139,10 +137,10 @@ impl CompactionPicker for CommonCompactionPicker {
ctx.strategy, input_files
);

compaction_task.compaction_inputs = vec![input_files];
builder.add_inputs(input_files);
}

Ok(compaction_task)
Ok(builder.build())
}
}

Expand Down Expand Up @@ -734,39 +732,39 @@ mod tests {
};
let now = Timestamp::now();
{
let lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
assert_eq!(task.compaction_inputs[0].files.len(), 2);
assert_eq!(task.compaction_inputs[0].files[0].id(), 0);
assert_eq!(task.compaction_inputs[0].files[1].id(), 1);
let mut lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
assert_eq!(task.inputs[0].files.len(), 2);
assert_eq!(task.inputs[0].files[0].id(), 0);
assert_eq!(task.inputs[0].files[1].id(), 1);
assert_eq!(task.expired[0].files.len(), 1);
assert_eq!(task.expired[0].files[0].id(), 3);
}

{
let lc = build_newest_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
assert_eq!(task.compaction_inputs[0].files.len(), 4);
assert_eq!(task.compaction_inputs[0].files[0].id(), 2);
assert_eq!(task.compaction_inputs[0].files[1].id(), 3);
assert_eq!(task.compaction_inputs[0].files[2].id(), 4);
assert_eq!(task.compaction_inputs[0].files[3].id(), 5);
let mut lc = build_newest_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
assert_eq!(task.inputs[0].files.len(), 4);
assert_eq!(task.inputs[0].files[0].id(), 2);
assert_eq!(task.inputs[0].files[1].id(), 3);
assert_eq!(task.inputs[0].files[2].id(), 4);
assert_eq!(task.inputs[0].files[3].id(), 5);
}

{
let lc = build_newest_bucket_no_match_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
assert_eq!(task.compaction_inputs.len(), 0);
let mut lc = build_newest_bucket_no_match_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
assert_eq!(task.inputs.len(), 0);
}

// If ttl is None, then no file is expired.
ctx.ttl = None;
{
let lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx, &lc).unwrap();
assert_eq!(task.compaction_inputs[0].files.len(), 2);
assert_eq!(task.compaction_inputs[0].files[0].id(), 0);
assert_eq!(task.compaction_inputs[0].files[1].id(), 1);
let mut lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx, &mut lc).unwrap();
assert_eq!(task.inputs[0].files.len(), 2);
assert_eq!(task.inputs[0].files[0].id(), 0);
assert_eq!(task.inputs[0].files[1].id(), 1);
assert!(task.expired[0].files.is_empty());
}
}
Expand Down
21 changes: 12 additions & 9 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl OngoingTaskLimit {

if dropped > 0 {
warn!(
"Too many compaction pending tasks, limit: {}, dropped {} older tasks.",
"Too many compaction pending tasks, limit:{}, dropped:{}.",
self.max_pending_compaction_tasks, dropped,
);
}
Expand Down Expand Up @@ -462,10 +462,7 @@ impl ScheduleWorker {
waiter_notifier: WaiterNotifier,
token: MemoryUsageToken,
) {
// Mark files being in compaction.
compaction_task.mark_files_being_compacted(true);

let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty();
let keep_scheduling_compaction = !compaction_task.is_input_empty();

let runtime = self.runtime.clone();
let space_store = self.space_store.clone();
Expand Down Expand Up @@ -503,9 +500,6 @@ impl ScheduleWorker {
.await;

if let Err(e) = &res {
// Compaction is failed, we need to unset the compaction mark.
compaction_task.mark_files_being_compacted(false);

error!(
"Failed to compact table, table_name:{}, table_id:{}, request_id:{}, err:{}",
table_data.name, table_data.id, request_id, e
Expand Down Expand Up @@ -656,7 +650,16 @@ impl ScheduleWorker {
self.max_unflushed_duration,
);

let mut serial_exec = table_data.serial_exec.lock().await;
let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await {
v
} else {
warn!(
"Table is closed, ignore this periodical flush, table:{}",
table_data.name
);
continue;
};

let flush_scheduler = serial_exec.flush_scheduler();
// Instance flush the table asynchronously.
if let Err(e) = flusher
Expand Down
16 changes: 10 additions & 6 deletions analytic_engine/src/instance/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//! Close table logic of instance

use log::{info, warn};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table_engine::engine::CloseTableRequest;

use crate::{
instance::{
engine::{DoManifestSnapshot, FlushTable, Result},
engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result},
flush_compaction::{Flusher, TableFlushOptions},
},
manifest::{ManifestRef, SnapshotRequest},
Expand Down Expand Up @@ -37,8 +37,11 @@ impl Closer {

// Flush table.
let opts = TableFlushOptions::default();
let mut serial_exec = table_data.serial_exec.lock().await;
let flush_scheduler = serial_exec.flush_scheduler();
let mut serial_exec_ctx = table_data
.acquire_serial_exec_ctx()
.await
.context(OperateClosedTable)?;
let flush_scheduler = serial_exec_ctx.flush_scheduler();

self.flusher
.do_flush(flush_scheduler, &table_data, opts)
Expand Down Expand Up @@ -67,9 +70,10 @@ impl Closer {
let removed_table = self.space.remove_table(&request.table_name);
assert!(removed_table.is_some());

serial_exec_ctx.invalidate();
info!(
"table:{}-{} has been removed from the space_id:{}",
table_data.name, table_data.id, self.space.id
"table:{} has been removed from the space_id:{}, table_id:{}",
table_data.name, self.space.id, table_data.id,
);
Ok(())
}
Expand Down
Loading

0 comments on commit 04c38b8

Please sign in to comment.