Skip to content

Commit

Permalink
feat: support memory usage limit on background compaction (apache#476)
Browse files Browse the repository at this point in the history
* feat: support memory usage limit on background compaction

* chore: add unit test

* config: add memory limit options for the compaction scheduler
  • Loading branch information
ShiKaiWi authored Dec 13, 2022
1 parent bd5edcc commit 9aaa10b
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 40 deletions.
11 changes: 11 additions & 0 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ impl CompactionTask {
}
}
}

// Estimate the size of the total input files.
pub fn estimate_total_input_size(&self) -> usize {
let total_input_size: u64 = self
.compaction_inputs
.iter()
.map(|v| v.files.iter().map(|f| f.size()).sum::<u64>())
.sum();

total_input_size as usize
}
}

pub struct PickerManager {
Expand Down
272 changes: 233 additions & 39 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
use async_trait::async_trait;
use common_types::{request_id::RequestId, time::Timestamp};
use common_util::{
config::ReadableDuration,
config::{ReadableDuration, ReadableSize},
define_result,
runtime::{JoinHandle, Runtime},
time::DurationExt,
Expand All @@ -26,7 +26,7 @@ use snafu::{ResultExt, Snafu};
use table_engine::table::TableId;
use tokio::{
sync::{
mpsc::{self, Receiver, Sender},
mpsc::{self, error::SendError, Receiver, Sender},
Mutex,
},
time,
Expand All @@ -37,7 +37,12 @@ use crate::{
metrics::COMPACTION_PENDING_REQUEST_GAUGE, picker::PickerContext, CompactionTask,
PickerManager, TableCompactionRequest, WaitError, WaiterNotifier,
},
instance::{flush_compaction::TableFlushOptions, Instance, SpaceStore},
instance::{
flush_compaction::{self, TableFlushOptions},
write_worker::CompactionNotifier,
Instance, SpaceStore,
},
table::data::TableDataRef,
TableOptions,
};

Expand All @@ -56,6 +61,7 @@ pub struct SchedulerConfig {
pub schedule_interval: ReadableDuration,
pub max_ongoing_tasks: usize,
pub max_unflushed_duration: ReadableDuration,
pub memory_limit: ReadableSize,
}

// TODO(boyan), a better default value?
Expand All @@ -71,6 +77,7 @@ impl Default for SchedulerConfig {
max_ongoing_tasks: MAX_GOING_COMPACTION_TASKS,
// flush_interval default is 5h.
max_unflushed_duration: ReadableDuration(Duration::from_secs(60 * 60 * 5)),
memory_limit: ReadableSize::gb(4),
}
}
}
Expand Down Expand Up @@ -134,6 +141,63 @@ impl<K: Eq + Hash + Clone, V> RequestQueue<K, V> {

type RequestBuf = RwLock<RequestQueue<TableId, TableCompactionRequest>>;

/// Combined with [`MemoryUsageToken`], [`MemoryLimit`] provides a mechanism to
/// impose limit on the memory usage.
#[derive(Clone, Debug)]
struct MemoryLimit {
usage: Arc<AtomicUsize>,
// TODO: support to adjust this threshold dynamically.
limit: usize,
}

/// The token for the memory usage, which should not derive Clone.
/// The applied memory will be subtracted from the global memory usage.
#[derive(Debug)]
struct MemoryUsageToken {
global_usage: Arc<AtomicUsize>,
applied_usage: usize,
}

impl Drop for MemoryUsageToken {
fn drop(&mut self) {
self.global_usage
.fetch_sub(self.applied_usage, Ordering::Relaxed);
}
}

impl MemoryLimit {
fn new(limit: usize) -> Self {
Self {
usage: Arc::new(AtomicUsize::new(0)),
limit,
}
}

/// Try to apply a token if possible.
fn try_apply_token(&self, bytes: usize) -> Option<MemoryUsageToken> {
let token = self.apply_token(bytes);
if self.is_exceeded() {
None
} else {
Some(token)
}
}

fn apply_token(&self, bytes: usize) -> MemoryUsageToken {
self.usage.fetch_add(bytes, Ordering::Relaxed);

MemoryUsageToken {
global_usage: self.usage.clone(),
applied_usage: bytes,
}
}

#[inline]
fn is_exceeded(&self) -> bool {
self.usage.load(Ordering::Relaxed) > self.limit
}
}

struct OngoingTaskLimit {
ongoing_tasks: AtomicUsize,
/// Buffer to hold pending requests
Expand Down Expand Up @@ -243,6 +307,7 @@ impl SchedulerImpl {
request_buf: RwLock::new(RequestQueue::default()),
}),
running: running.clone(),
memory_limit: MemoryLimit::new(config.memory_limit.as_bytes() as usize),
};

let handle = runtime.spawn(async move {
Expand Down Expand Up @@ -306,6 +371,7 @@ struct ScheduleWorker {
max_ongoing_tasks: usize,
limit: Arc<OngoingTaskLimit>,
running: Arc<AtomicBool>,
memory_limit: MemoryLimit,
}

#[inline]
Expand Down Expand Up @@ -358,15 +424,15 @@ impl ScheduleWorker {
self.limit.request_buf_len()
);
} else {
self.do_table_compaction_request(compact_req).await;
self.handle_table_compaction_request(compact_req).await;
}
}
ScheduleTask::Schedule => {
if self.max_ongoing_tasks > ongoing {
let pending = self.limit.drain_requests(self.max_ongoing_tasks - ongoing);
let len = pending.len();
for compact_req in pending {
self.do_table_compaction_request(compact_req).await;
self.handle_table_compaction_request(compact_req).await;
}
debug!("Scheduled {} pending compaction tasks.", len);
}
Expand All @@ -375,40 +441,15 @@ impl ScheduleWorker {
};
}

async fn do_table_compaction_request(&self, compact_req: TableCompactionRequest) {
let table_data = compact_req.table_data;
let compaction_notifier = compact_req.compaction_notifier;
let waiter_notifier = WaiterNotifier::new(compact_req.waiter);

let table_options = table_data.table_options();
let compaction_strategy = table_options.compaction_strategy;
let picker = self.picker_manager.get_picker(compaction_strategy);
let picker_ctx = match new_picker_context(&table_options) {
Some(v) => v,
None => {
warn!("No valid context can be created, compaction request will be ignored, table_id:{}, table_name:{}",
table_data.id, table_data.name);
return;
}
};
let version = table_data.current_version();

// Pick compaction task.
let compaction_task = version.pick_for_compaction(picker_ctx, &picker);
let compaction_task = match compaction_task {
Ok(v) => v,
Err(e) => {
error!(
"Compaction scheduler failed to pick compaction, table:{}, table_id:{}, err:{}",
table_data.name, table_data.id, e
);
// Now the error of picking compaction is considered not fatal and not sent to
// compaction notifier.
return;
}
};

// Mark files are in compaction.
async fn do_table_compaction_task(
&self,
table_data: TableDataRef,
compaction_task: CompactionTask,
compaction_notifier: CompactionNotifier,
waiter_notifier: WaiterNotifier,
token: MemoryUsageToken,
) {
// Mark files being in compaction.
compaction_task.mark_files_being_compacted(true);

let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty();
Expand All @@ -425,6 +466,9 @@ impl ScheduleWorker {
let request_id = RequestId::next_id();
// Do actual costly compact job in background.
self.runtime.spawn(async move {
// Release the token after compaction finished.
let _token = token;

let res = space_store
.compact_table(runtime, &table_data, request_id, &compaction_task)
.await;
Expand Down Expand Up @@ -470,6 +514,91 @@ impl ScheduleWorker {
});
}

// Try to apply the memory usage token. Return `None` if the current memory
// usage exceeds the limit.
fn try_apply_memory_usage_token_for_task(
&self,
task: &CompactionTask,
) -> Option<MemoryUsageToken> {
let input_size = task.estimate_total_input_size();
let estimate_memory_usage = input_size * 2;
self.memory_limit.try_apply_token(estimate_memory_usage)
}

async fn handle_table_compaction_request(&self, compact_req: TableCompactionRequest) {
let table_data = compact_req.table_data.clone();
let table_options = table_data.table_options();
let compaction_strategy = table_options.compaction_strategy;
let picker = self.picker_manager.get_picker(compaction_strategy);
let picker_ctx = match new_picker_context(&table_options) {
Some(v) => v,
None => {
warn!("No valid context can be created, compaction request will be ignored, table_id:{}, table_name:{}",
table_data.id, table_data.name);
return;
}
};
let version = table_data.current_version();

// Pick compaction task.
let compaction_task = version.pick_for_compaction(picker_ctx, &picker);
let compaction_task = match compaction_task {
Ok(v) => v,
Err(e) => {
error!(
"Compaction scheduler failed to pick compaction, table:{}, table_id:{}, err:{}",
table_data.name, table_data.id, e
);
// Now the error of picking compaction is considered not fatal and not sent to
// compaction notifier.
return;
}
};

let token = match self.try_apply_memory_usage_token_for_task(&compaction_task) {
Some(v) => v,
None => {
// Memory usage exceeds the threshold, let's put pack the
// request.
self.put_back_compaction_request(compact_req).await;
return;
}
};

let compaction_notifier = compact_req.compaction_notifier;
let waiter_notifier = WaiterNotifier::new(compact_req.waiter);

self.do_table_compaction_task(
table_data,
compaction_task,
compaction_notifier,
waiter_notifier,
token,
)
.await;
}

async fn put_back_compaction_request(&self, req: TableCompactionRequest) {
if let Err(SendError(ScheduleTask::Request(TableCompactionRequest {
compaction_notifier,
waiter,
..
}))) = self.sender.send(ScheduleTask::Request(req)).await
{
let e = Arc::new(
flush_compaction::Other {
msg: "Failed to put back the compaction request for memory usage exceeds",
}
.build(),
);
compaction_notifier.notify_err(e.clone());

let waiter_notifier = WaiterNotifier::new(waiter);
let wait_err = WaitError::Compaction { source: e };
waiter_notifier.notify_wait_result(Err(wait_err));
}
}

async fn schedule(&mut self) {
self.purge_tables();
self.flush_tables().await;
Expand Down Expand Up @@ -570,6 +699,71 @@ fn new_picker_context(table_opts: &TableOptions) -> Option<PickerContext> {
mod tests {
use super::*;

#[test]
fn test_memory_usage_limit_apply() {
let limit = MemoryLimit::new(100);
let cases = vec![
// One case is (applied_requests, applied_results).
(vec![10, 20, 90, 30], vec![true, true, false, true]),
(vec![100, 10], vec![true, false]),
(vec![0, 90, 10], vec![true, true, true]),
];

for (apply_requests, expect_applied_results) in cases {
assert_eq!(limit.usage.load(Ordering::Relaxed), 0);

let mut applied_tokens = Vec::with_capacity(apply_requests.len());
for bytes in &apply_requests {
let token = limit.try_apply_token(*bytes);
applied_tokens.push(token);
}
assert_eq!(applied_tokens.len(), expect_applied_results.len());
assert_eq!(applied_tokens.len(), applied_tokens.len());

for (token, (apply_bytes, applied)) in applied_tokens.into_iter().zip(
apply_requests
.into_iter()
.zip(expect_applied_results.into_iter()),
) {
if applied {
let token = token.unwrap();
assert_eq!(token.applied_usage, apply_bytes);
assert_eq!(
token.global_usage.load(Ordering::Relaxed),
limit.usage.load(Ordering::Relaxed),
);
}
}
}
}

#[test]
fn test_memory_usage_limit_release() {
let limit = MemoryLimit::new(100);

let cases = vec![
// One case includes the operation consisting of (applied bytes, whether to keep the
// applied token) and final memory usage.
(vec![(10, false), (20, false)], 0),
(vec![(100, false), (10, true), (20, true), (30, true)], 60),
(vec![(0, false), (100, false), (20, true), (30, false)], 20),
];

for (ops, expect_memory_usage) in cases {
assert_eq!(limit.usage.load(Ordering::Relaxed), 0);

let mut tokens = Vec::new();
for (applied_bytes, keep_token) in ops {
let token = limit.try_apply_token(applied_bytes);
if keep_token {
tokens.push(token);
}
}

assert_eq!(limit.usage.load(Ordering::Relaxed), expect_memory_usage);
}
}

#[test]
fn test_request_queue() {
let mut q: RequestQueue<i32, String> = RequestQueue::default();
Expand Down
Loading

0 comments on commit 9aaa10b

Please sign in to comment.