Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move wal structs and traits to wal crate #1263

Merged
merged 5 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

163 changes: 1 addition & 162 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,10 @@ pub mod table_meta_set_impl;
pub mod tests;

use manifest::details::Options as ManifestOptions;
use message_queue::kafka::config::Config as KafkaConfig;
use object_store::config::StorageOptions;
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
use table_kv::config::ObkvConfig;
use time_ext::ReadableDuration;
use wal::{
message_queue_impl::config::Config as MessageQueueWalConfig,
rocks_impl::config::Config as RocksDBWalConfig, table_kv_impl::model::NamespaceConfig,
};
use wal::config::WalStorageConfig;

pub use crate::{compaction::scheduler::SchedulerConfig, table_options::TableOptions};

Expand Down Expand Up @@ -170,158 +164,3 @@ impl Default for Config {
}
}
}

/// Config of wal based on obkv
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct ObkvWalConfig {
/// Obkv client config
pub obkv: ObkvConfig,
/// Namespace config for data.
pub data_namespace: WalNamespaceConfig,
/// Namespace config for meta data
pub meta_namespace: ManifestNamespaceConfig,
}

/// Config of obkv wal based manifest
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ManifestNamespaceConfig {
/// Decide how many wal data shards will be created
///
/// NOTICE: it can just be set once, the later setting makes no effect.
pub shard_num: usize,

/// Decide how many wal meta shards will be created
///
/// NOTICE: it can just be set once, the later setting makes no effect.
pub meta_shard_num: usize,

pub init_scan_timeout: ReadableDuration,
pub init_scan_batch_size: i32,
pub clean_scan_timeout: ReadableDuration,
pub clean_scan_batch_size: usize,
pub bucket_create_parallelism: usize,
}

impl Default for ManifestNamespaceConfig {
fn default() -> Self {
let namespace_config = NamespaceConfig::default();

Self {
shard_num: namespace_config.wal_shard_num,
meta_shard_num: namespace_config.table_unit_meta_shard_num,
init_scan_timeout: namespace_config.init_scan_timeout,
init_scan_batch_size: namespace_config.init_scan_batch_size,
clean_scan_timeout: namespace_config.clean_scan_timeout,
clean_scan_batch_size: namespace_config.clean_scan_batch_size,
bucket_create_parallelism: namespace_config.bucket_create_parallelism,
}
}
}

impl From<ManifestNamespaceConfig> for NamespaceConfig {
fn from(manifest_config: ManifestNamespaceConfig) -> Self {
NamespaceConfig {
wal_shard_num: manifest_config.shard_num,
table_unit_meta_shard_num: manifest_config.meta_shard_num,
ttl: None,
init_scan_timeout: manifest_config.init_scan_timeout,
init_scan_batch_size: manifest_config.init_scan_batch_size,
clean_scan_timeout: manifest_config.clean_scan_timeout,
clean_scan_batch_size: manifest_config.clean_scan_batch_size,
bucket_create_parallelism: manifest_config.bucket_create_parallelism,
}
}
}

/// Config of obkv wal based wal module
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct WalNamespaceConfig {
/// Decide how many wal data shards will be created
///
/// NOTICE: it can just be set once, the later setting makes no effect.
pub shard_num: usize,

/// Decide how many wal meta shards will be created
///
/// NOTICE: it can just be set once, the later setting makes no effect.
pub meta_shard_num: usize,

pub ttl: ReadableDuration,
pub init_scan_timeout: ReadableDuration,
pub init_scan_batch_size: i32,
pub bucket_create_parallelism: usize,
}

impl Default for WalNamespaceConfig {
fn default() -> Self {
let namespace_config = NamespaceConfig::default();

Self {
shard_num: namespace_config.wal_shard_num,
meta_shard_num: namespace_config.table_unit_meta_shard_num,
ttl: namespace_config.ttl.unwrap(),
init_scan_timeout: namespace_config.init_scan_timeout,
init_scan_batch_size: namespace_config.init_scan_batch_size,
bucket_create_parallelism: namespace_config.bucket_create_parallelism,
}
}
}

impl From<WalNamespaceConfig> for NamespaceConfig {
fn from(wal_config: WalNamespaceConfig) -> Self {
Self {
wal_shard_num: wal_config.shard_num,
table_unit_meta_shard_num: wal_config.meta_shard_num,
ttl: Some(wal_config.ttl),
init_scan_timeout: wal_config.init_scan_timeout,
init_scan_batch_size: wal_config.init_scan_batch_size,
bucket_create_parallelism: wal_config.bucket_create_parallelism,
..Default::default()
}
}
}

/// Config of wal based on obkv
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct KafkaWalConfig {
/// Kafka client config
pub kafka: KafkaConfig,

/// Namespace config for data.
pub data_namespace: MessageQueueWalConfig,
/// Namespace config for meta data
pub meta_namespace: MessageQueueWalConfig,
}

/// Config for wal based on RocksDB
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct RocksDBConfig {
/// Data directory used by RocksDB.
pub data_dir: String,

pub data_namespace: RocksDBWalConfig,
pub meta_namespace: RocksDBWalConfig,
}

impl Default for RocksDBConfig {
fn default() -> Self {
Self {
data_dir: "/tmp/ceresdb".to_string(),
data_namespace: Default::default(),
meta_namespace: Default::default(),
}
}
}
/// Options for wal storage backend
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type")]
pub enum WalStorageConfig {
RocksDB(Box<RocksDBConfig>),
Obkv(Box<ObkvWalConfig>),
Kafka(Box<KafkaWalConfig>),
}
Loading
Loading