Skip to content

Commit

Permalink
fix: add table status to cancel background jobs (#1212)
Browse files Browse the repository at this point in the history
## Rationale
Close #1205

## Detailed Changes
- Add `TableStatus::Closed` to donate table is closed, and cancel
background jobs, such as compaction.

## Test Plan
  • Loading branch information
jiacai2050 authored Sep 14, 2023
1 parent 3b9beb1 commit bd8c970
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 11 deletions.
14 changes: 13 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ analytic_engine = { path = "analytic_engine" }
arena = { path = "components/arena" }
async-stream = "0.3.4"
async-trait = "0.1.72"
atomic_enum = "0.2.0"
base64 = "0.13"
bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ arena = { workspace = true }
arrow = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
atomic_enum = { workspace = true }
base64 = { workspace = true }
bytes_ext = { workspace = true }
ceresdbproto = { workspace = true }
Expand Down
8 changes: 8 additions & 0 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,14 @@ impl ScheduleWorker {

async fn handle_table_compaction_request(&self, compact_req: TableCompactionRequest) {
let table_data = compact_req.table_data.clone();
if !table_data.allow_compaction() {
error!(
"Table status is not ok, unable to compact further, table:{}, table_id:{}",
table_data.name, table_data.id
);
return;
}

let table_options = table_data.table_options();
let compaction_strategy = table_options.compaction_strategy;
let picker = self.picker_manager.get_picker(compaction_strategy);
Expand Down
4 changes: 4 additions & 0 deletions analytic_engine/src/instance/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ impl Closer {
let removed_table = self.space.remove_table(&request.table_name);
assert!(removed_table.is_some());

// Table is already moved out of space, we should close it to stop background
// jobs.
table_data.set_closed();

info!(
"table:{}-{} has been removed from the space_id:{}",
table_data.name, table_data.id, self.space.id
Expand Down
10 changes: 10 additions & 0 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,16 @@ impl SpaceStore {
.await?;
}

if !table_data.allow_compaction() {
return Other {
msg: format!(
"Table status is not ok, unable to update manifest, table:{}, table_id:{}",
table_data.name, table_data.id
),
}
.fail();
}

let edit_req = {
let meta_update = MetaUpdate::VersionEdit(edit_meta.clone());
MetaEditRequest {
Expand Down
53 changes: 43 additions & 10 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
fmt::Formatter,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering},
atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
},
time::Duration,
Expand Down Expand Up @@ -111,6 +111,28 @@ impl TableShardInfo {
}
}

/// `atomic_enum` macro will expand method like
/// ```text
/// compare_exchange(..) -> Result<TableStatus, TableStatus>
/// ```
/// The result type is conflict with outer
/// Result, so add this hack
// TODO: fix this in atomic_enum crate.
mod hack {
use atomic_enum::atomic_enum;

#[atomic_enum]
#[derive(PartialEq)]
pub enum TableStatus {
Ok = 0,
Closed,
/// No write/alter are allowed after table is dropped.
Dropped,
}
}

use self::hack::{AtomicTableStatus, TableStatus};

/// Data of a table
pub struct TableData {
/// Id of this table
Expand Down Expand Up @@ -161,10 +183,8 @@ pub struct TableData {
/// Not persist, used to determine if this table should flush.
last_flush_time_ms: AtomicU64,

/// Flag denoting whether the table is dropped
///
/// No write/alter is allowed if the table is dropped.
dropped: AtomicBool,
/// Table Status
status: AtomicTableStatus,

/// Manifest updates after last snapshot
manifest_updates: AtomicUsize,
Expand Down Expand Up @@ -192,7 +212,7 @@ impl fmt::Debug for TableData {
.field("opts", &self.opts)
.field("last_sequence", &self.last_sequence)
.field("last_memtable_id", &self.last_memtable_id)
.field("dropped", &self.dropped.load(Ordering::Relaxed))
.field("status", &self.status.load(Ordering::Relaxed))
.field("shard_info", &self.shard_info)
.finish()
}
Expand Down Expand Up @@ -265,7 +285,7 @@ impl TableData {
last_memtable_id: AtomicU64::new(0),
allocator: IdAllocator::new(0, 0, DEFAULT_ALLOC_STEP),
last_flush_time_ms: AtomicU64::new(0),
dropped: AtomicBool::new(false),
status: TableStatus::Ok.into(),
metrics,
shard_info: TableShardInfo::new(shard_id),
serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(table_id)),
Expand Down Expand Up @@ -310,7 +330,7 @@ impl TableData {
last_memtable_id: AtomicU64::new(0),
allocator,
last_flush_time_ms: AtomicU64::new(0),
dropped: AtomicBool::new(false),
status: TableStatus::Ok.into(),
metrics,
shard_info: TableShardInfo::new(shard_id),
serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(add_meta.table_id)),
Expand Down Expand Up @@ -382,13 +402,26 @@ impl TableData {

#[inline]
pub fn is_dropped(&self) -> bool {
self.dropped.load(Ordering::SeqCst)
self.status.load(Ordering::SeqCst) == TableStatus::Dropped
}

/// Set the table is dropped and forbid any writes/alter on this table.
#[inline]
pub fn set_dropped(&self) {
self.dropped.store(true, Ordering::SeqCst);
self.status.store(TableStatus::Dropped, Ordering::SeqCst)
}

#[inline]
pub fn set_closed(&self) {
self.status.store(TableStatus::Closed, Ordering::SeqCst)
}

#[inline]
pub fn allow_compaction(&self) -> bool {
match self.status.load(Ordering::SeqCst) {
TableStatus::Ok => true,
TableStatus::Closed | TableStatus::Dropped => false,
}
}

/// Returns total memtable memory usage in bytes.
Expand Down

0 comments on commit bd8c970

Please sign in to comment.