Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: modify wal config #629

Merged
merged 3 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ pub struct Config {
/// Storage options of the engine
pub storage: StorageOptions,

/// WAL path of the engine
pub wal_path: String,

/// Batch size to read records from wal to replay
pub replay_batch_size: usize,
/// Batch size to replay tables
Expand Down Expand Up @@ -96,7 +93,6 @@ impl Default for Config {
fn default() -> Self {
Self {
storage: Default::default(),
wal_path: "/tmp/ceresdb".to_string(),
replay_batch_size: 500,
max_replay_tables_per_batch: 64,
write_group_worker_num: 8,
Expand All @@ -114,7 +110,7 @@ impl Default for Config {
db_write_buffer_size: 0,
scan_batch_size: 500,
sst_background_read_parallelism: 8,
wal_storage: WalStorageConfig::RocksDB,
wal_storage: WalStorageConfig::RocksDB(Box::new(RocksDBConfig::default())),
remote_engine_client: remote_engine_client::config::Config::default(),
}
}
Expand Down Expand Up @@ -238,11 +234,26 @@ pub struct KafkaWalConfig {
pub wal_config: MessageQueueWalConfig,
}

/// Config for wal based on rocksDB
#[derive(Debug, Clone, Deserialize)]
#[serde(default)]
pub struct RocksDBConfig {
/// Path used by rocksdb
pub path: String,
}

impl Default for RocksDBConfig {
fn default() -> Self {
Self {
path: "/tmp/ceresdb".to_string(),
}
}
}
/// Options for wal storage backend
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")]
pub enum WalStorageConfig {
RocksDB,
RocksDB(Box<RocksDBConfig>),
Obkv(Box<ObkvWalConfig>),
Kafka(Box<KafkaWalConfig>),
}
8 changes: 4 additions & 4 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ impl EngineBuilder for RocksDBWalEngineBuilder {
engine_runtimes: Arc<EngineRuntimes>,
object_store: ObjectStoreRef,
) -> Result<(WalManagerRef, ManifestRef)> {
match &config.wal_storage {
WalStorageConfig::RocksDB => {}
let rocksdb_wal_config = match config.wal_storage {
WalStorageConfig::RocksDB(config) => *config,
_ => {
return InvalidWalConfig {
msg: format!(
Expand All @@ -198,10 +198,10 @@ impl EngineBuilder for RocksDBWalEngineBuilder {
}
.fail();
}
}
};

let write_runtime = engine_runtimes.write_runtime.clone();
let data_path = Path::new(&config.wal_path);
let data_path = Path::new(&rocksdb_wal_config.path);
let wal_path = data_path.join(WAL_DIR_NAME);
let wal_manager = WalBuilder::with_default_rocksdb_config(wal_path, write_runtime.clone())
.build()
Expand Down
16 changes: 10 additions & 6 deletions analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
},
storage_options::{LocalOptions, ObjectStoreOptions, StorageOptions},
tests::table::{self, FixedSchemaTable, RowTuple},
Config, ObkvWalConfig, WalStorageConfig,
Config, ObkvWalConfig, RocksDBConfig, WalStorageConfig,
};

const DAY_MS: i64 = 24 * 60 * 60 * 1000;
Expand Down Expand Up @@ -417,7 +417,9 @@ impl Builder {
data_path: dir.path().to_str().unwrap().to_string(),
}),
},
wal_path: dir.path().to_str().unwrap().to_string(),
wal_storage: WalStorageConfig::RocksDB(Box::new(RocksDBConfig {
path: dir.path().to_str().unwrap().to_string(),
})),
..Default::default()
};

Expand Down Expand Up @@ -475,8 +477,9 @@ impl Default for RocksDBEngineContext {
}),
},

wal_path: dir.path().to_str().unwrap().to_string(),
wal_storage: WalStorageConfig::RocksDB,
wal_storage: WalStorageConfig::RocksDB(Box::new(RocksDBConfig {
path: dir.path().to_str().unwrap().to_string(),
})),
..Default::default()
};

Expand All @@ -501,7 +504,9 @@ impl Clone for RocksDBEngineContext {
};

config.storage = storage;
config.wal_path = dir.path().to_str().unwrap().to_string();
config.wal_storage = WalStorageConfig::RocksDB(Box::new(RocksDBConfig {
path: dir.path().to_str().unwrap().to_string(),
}));

Self { config }
}
Expand Down Expand Up @@ -539,7 +544,6 @@ impl Default for MemoryEngineContext {
data_path: dir.path().to_str().unwrap().to_string(),
}),
},
wal_path: dir.path().to_str().unwrap().to_string(),
wal_storage: WalStorageConfig::Obkv(Box::new(ObkvWalConfig::default())),
..Default::default()
};
Expand Down
7 changes: 4 additions & 3 deletions docs/example-cluster-0.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ mysql_port = 3307
log_level = "info"
deploy_mode = "Cluster"

[analytic]
wal_path = "/tmp/ceresdb0"

[analytic.storage]
mem_cache_capacity = '1G'
mem_cache_partition_bits = 0
Expand All @@ -17,6 +14,10 @@ mem_cache_partition_bits = 0
type = "Local"
data_path = "/tmp/ceresdb0"

[analytic.wal_storage]
type = "RocksDB"
path = "/tmp/ceresdb0"
Rachelint marked this conversation as resolved.
Show resolved Hide resolved

[cluster]
cmd_channel_buffer_size = 10

Expand Down
7 changes: 4 additions & 3 deletions docs/example-cluster-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ mysql_port = 13307
log_level = "info"
deploy_mode = "Cluster"

[analytic]
wal_path = "/tmp/ceresdb1"

[analytic.storage]
mem_cache_capacity = '1G'
mem_cache_partition_bits = 0
Expand All @@ -17,6 +14,10 @@ mem_cache_partition_bits = 0
type = "Local"
data_path = "/tmp/ceresdb1"

[analytic.wal_storage]
type = "RocksDB"
path = "/tmp/ceresdb1"

[cluster]
cmd_channel_buffer_size = 10

Expand Down
8 changes: 5 additions & 3 deletions docs/example-standalone-static-routing.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ write_thread_num = 4
background_thread_num = 4

[analytic]
wal_path = "/tmp/ceresdb/"
write_group_worker_num = 4
replay_batch_size = 500
max_replay_tables_per_batch = 1024
write_group_command_channel_cap = 1024

[analytic.storage]
type = "Local"
data_path = "/tmp/ceresdb/"
mem_cache_capacity = '1G'
mem_cache_partition_bits = 0

[analytic.storage.object_store]
type = "Local"
data_path = "/tmp/ceresdb"

[analytic.compaction_config]
schedule_channel_len = 4
Expand Down
2 changes: 1 addition & 1 deletion src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub fn run_server(config: Config, log_runtime: RuntimeLevel) {

runtimes.bg_runtime.block_on(async {
match config.analytic.wal_storage {
WalStorageConfig::RocksDB => {
WalStorageConfig::RocksDB(_) => {
run_server_with_runtimes::<RocksDBWalEngineBuilder>(
config,
engine_runtimes,
Expand Down