Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make obkv wal opening more parallelly #1129

Merged
merged 5 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ pub struct ManifestNamespaceConfig {
pub init_scan_batch_size: i32,
pub clean_scan_timeout: ReadableDuration,
pub clean_scan_batch_size: usize,
pub bucket_create_parallelism: usize,
}

impl Default for ManifestNamespaceConfig {
Expand All @@ -192,6 +193,7 @@ impl Default for ManifestNamespaceConfig {
init_scan_batch_size: namespace_config.init_scan_batch_size,
clean_scan_timeout: namespace_config.clean_scan_timeout,
clean_scan_batch_size: namespace_config.clean_scan_batch_size,
bucket_create_parallelism: namespace_config.bucket_create_parallelism,
}
}
}
Expand All @@ -206,6 +208,7 @@ impl From<ManifestNamespaceConfig> for NamespaceConfig {
init_scan_batch_size: manifest_config.init_scan_batch_size,
clean_scan_timeout: manifest_config.clean_scan_timeout,
clean_scan_batch_size: manifest_config.clean_scan_batch_size,
bucket_create_parallelism: manifest_config.bucket_create_parallelism,
}
}
}
Expand All @@ -227,6 +230,7 @@ pub struct WalNamespaceConfig {
pub ttl: ReadableDuration,
pub init_scan_timeout: ReadableDuration,
pub init_scan_batch_size: i32,
pub bucket_create_parallelism: usize,
}

impl Default for WalNamespaceConfig {
Expand All @@ -239,6 +243,7 @@ impl Default for WalNamespaceConfig {
ttl: namespace_config.ttl.unwrap(),
init_scan_timeout: namespace_config.init_scan_timeout,
init_scan_batch_size: namespace_config.init_scan_batch_size,
bucket_create_parallelism: namespace_config.bucket_create_parallelism,
}
}
}
Expand All @@ -251,6 +256,7 @@ impl From<WalNamespaceConfig> for NamespaceConfig {
ttl: Some(wal_config.ttl),
init_scan_timeout: wal_config.init_scan_timeout,
init_scan_batch_size: wal_config.init_scan_batch_size,
bucket_create_parallelism: wal_config.bucket_create_parallelism,
..Default::default()
}
}
Expand Down
2 changes: 2 additions & 0 deletions wal/src/table_kv_impl/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ pub struct NamespaceConfig {
pub init_scan_batch_size: i32,
pub clean_scan_timeout: ReadableDuration,
pub clean_scan_batch_size: usize,
pub bucket_create_parallelism: usize,
}

impl NamespaceConfig {
Expand Down Expand Up @@ -325,6 +326,7 @@ impl Default for NamespaceConfig {
init_scan_batch_size: 100,
clean_scan_timeout: default_clean_ctx.scan_timeout.into(),
clean_scan_batch_size: default_clean_ctx.batch_size,
bucket_create_parallelism: 32,
}
}
}
Expand Down
173 changes: 155 additions & 18 deletions wal/src/table_kv_impl/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
use std::{
collections::{BTreeMap, HashMap},
fmt, str,
sync::{Arc, Mutex, RwLock},
time::Duration,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock,
},
thread,
time::{Duration, Instant},
};

use common_types::{table::TableId, time::Timestamp};
use generic_error::{BoxError, GenericError};
use generic_error::{BoxError, GenericError, GenericResult};
use log::{debug, error, info, trace, warn};
use macros::define_result;
use runtime::Runtime;
Expand Down Expand Up @@ -57,6 +61,15 @@ pub enum Error {
source: GenericError,
},

#[snafu(display(
"Failed to load buckets, namespace:{namespace}, msg:{msg}.\nBacktrace:\n{backtrace}"
))]
LoadBucketsNoCause {
namespace: String,
msg: String,
backtrace: Backtrace,
},

#[snafu(display("Failed to open bucket, namespace:{}, err:{}", namespace, source,))]
BucketMeta {
namespace: String,
Expand Down Expand Up @@ -211,6 +224,8 @@ pub const BUCKET_DURATION_MS: i64 = 1000 * 3600 * 24;
const BUCKET_MONITOR_PERIOD: Duration = Duration::from_millis(BUCKET_DURATION_MS as u64 / 8);
/// Clean deleted logs period.
const LOG_CLEANER_PERIOD: Duration = Duration::from_millis(BUCKET_DURATION_MS as u64 / 4);
/// Monitor table creating period.
const MONITOR_TABLE_CREATING_PERIOD: Duration = Duration::from_secs(30);

struct NamespaceInner<T> {
runtimes: WalRuntimes,
Expand Down Expand Up @@ -377,18 +392,35 @@ impl<T: TableKv> NamespaceInner<T> {
/// Open bucket, ensure all tables are created, and insert the bucket into
/// the bucket set in memory.
fn open_bucket(&self, bucket: Bucket) -> Result<BucketRef> {
let bucket_info = bucket.entry;
info!(
"TableKvWal begin to open bucket, bucket_info:{:?}, namespace:{}",
bucket_info,
self.name()
);

let timer = Instant::now();
{
// Create all wal shards of this bucket.
let mut operator = self.operator.lock().unwrap();
for wal_shard in &bucket.wal_shard_names {
operator.create_table_if_needed(&self.table_kv, self.name(), wal_shard)?;
}
operator.create_table_if_needed(
&self.table_kv,
self.name(),
bucket.wal_shard_names.clone(),
)?;
}

let bucket = Arc::new(bucket);
let mut bucket_set = self.bucket_set.write().unwrap();
bucket_set.insert_bucket(bucket.clone());

info!(
"TableKvWal success to open bucket, cost:{:?}, bucket_info:{:?}, namespace:{}",
timer.elapsed(),
bucket_info,
self.name(),
);

Ok(bucket)
}

Expand Down Expand Up @@ -1027,7 +1059,9 @@ impl<T: TableKv> Namespace<T> {
table_units: RwLock::new(HashMap::new()),
meta_table_name: meta_table_name.to_string(),
table_unit_meta_tables,
operator: Mutex::new(TableOperator),
operator: Mutex::new(TableOperator {
bucket_create_parallelism: config.bucket_create_parallelism,
}),
bucket_creator: Mutex::new(BucketCreator),
config,
});
Expand Down Expand Up @@ -1202,28 +1236,131 @@ impl<T: TableKv> Namespace<T> {
pub type NamespaceRef<T> = Arc<Namespace<T>>;

/// Table operator wraps create/drop table operations.
struct TableOperator;
struct TableOperator {
bucket_create_parallelism: usize,
}

impl TableOperator {
fn create_table_if_needed<T: TableKv>(
&mut self,
table_kv: &T,
namespace: &str,
table_name: &str,
wal_shards: Vec<String>,
) -> Result<()> {
let table_exists = table_kv
.table_exists(table_name)
.box_err()
.context(BucketMeta { namespace })?;
if !table_exists {
table_kv
.create_table(table_name)
.box_err()
.context(BucketMeta { namespace })?;
let wal_shard_num = wal_shards.len();
let wal_shard_group_num = std::cmp::min(self.bucket_create_parallelism, wal_shard_num);
let wal_shard_num_per_group = wal_shard_num / wal_shard_group_num;
let wal_shard_groups = wal_shards
.chunks(wal_shard_num_per_group)
.map(|group| group.to_owned())
.collect::<Vec<_>>();
let (tx, rx) = std::sync::mpsc::channel();
let stop = Arc::new(AtomicBool::new(false));

// Create tables in parallel.
Self::do_create_table_in_parallel(namespace, table_kv, wal_shard_groups, tx, stop.clone());

// Wait for all tables created.
let mut cur_running_tasks = wal_shard_group_num;
loop {
if cur_running_tasks == 0 {
break;
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
}

match rx.recv_timeout(MONITOR_TABLE_CREATING_PERIOD) {
Ok(Ok(_)) => {
cur_running_tasks -= 1;
}
Ok(Err(e)) => {
stop.store(true, Ordering::Relaxed);
return Err(e).context(LoadBuckets { namespace });
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
stop.store(true, Ordering::Relaxed);
return LoadBucketsNoCause {
namespace,
msg: "result notifier in create table task is closed",
}
.fail();
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
info!(
"TableOperator monitor tables creating periodically, namespace:{namespace}"
);
}
};
}

Ok(())
}

fn do_create_table_in_parallel<T: TableKv>(
namespace: &str,
table_kv: &T,
wal_shard_groups: Vec<Vec<String>>,
tx: std::sync::mpsc::Sender<GenericResult<()>>,
stop: Arc<AtomicBool>,
) {
for group in wal_shard_groups {
let namespace = namespace.to_owned();
let table_kv = table_kv.clone();
let tx = tx.clone();
let stop = stop.clone();
thread::spawn(move || {
for table in group {
if stop.load(Ordering::Relaxed) {
error!("TableOperator create table task stopped, namespace:{namespace}");
return;
}

// Judge if table exists.
let table_exists_result = table_kv
.table_exists(&table)
.box_err()
.map_err(|e| {
error!("TableOperator failed to judge if table exists, table:{table}, namespace:{namespace}");
e
});

let table_exists = match table_exists_result {
Ok(exists) => exists,
Err(e) => {
let _ = tx.send(Err(e)).map_err(|e| {
error!("TableOperator failed to send table result, table:{table}, namespace:{namespace}, err:{e}");
});
return;
}
};

if table_exists {
continue;
}

// Not exist, create table.
let create_table_result = table_kv
.create_table(&table)
.box_err()
.map_err(|e| {
error!("TableOperator failed to create table, table:{table}, namespace:{namespace}");
e
});

if let Err(e) = create_table_result {
let _ = tx.send(Err(e)).map_err(|e| {
error!("TableOperator failed to send table result, table:{table}, namespace:{namespace}, err:{e}");
});
return;
}
}

let _ = tx.send(Ok(())).map_err(|e| {
error!(
"TableOperator failed to send final result, namespace:{namespace}, err:{e}"
);
});
});
}
}
}

/// Time buckets of a namespace, orderded by start time.
Expand Down
Loading