Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl DoNothing wal #1311

Merged
merged 5 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/ceresdb/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
}
}

fn validate_config(config: &Config) {
let is_data_wal_disabled = config.analytic.wal.data_disabled();
if is_data_wal_disabled {
let is_cluster = config.cluster_deployment.is_some();
if !is_cluster {
panic!("Invalid config, we can only disable data wal in cluster deployments")
}
}
}

/// Run a server, returns when the server is shutdown by user
pub fn run_server(config: Config, log_runtime: RuntimeLevel) {
let runtimes = Arc::new(build_engine_runtimes(&config.runtime));
Expand All @@ -108,6 +118,8 @@ pub fn run_server(config: Config, log_runtime: RuntimeLevel) {

info!("Server starts up, config:{:#?}", config);

validate_config(&config);

runtimes.default_runtime.block_on(async {
match config.analytic.wal {
StorageConfig::RocksDB(_) => {
Expand Down
40 changes: 40 additions & 0 deletions src/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,43 @@ pub enum StorageConfig {
Obkv(Box<ObkvStorageConfig>),
Kafka(Box<KafkaStorageConfig>),
}

impl StorageConfig {
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
pub fn data_disabled(&self) -> bool {
match self {
Self::RocksDB(c) => {
#[cfg(feature = "wal-rocksdb")]
{
c.disable_data
}
#[cfg(not(feature = "wal-rocksdb"))]
{
_ = c;
false
}
}
Self::Obkv(c) => {
#[cfg(feature = "wal-table-kv")]
{
c.disable_data
}
#[cfg(not(feature = "wal-table-kv"))]
{
_ = c;
false
}
}
Self::Kafka(c) => {
#[cfg(feature = "wal-message-queue")]
{
c.disable_data
}
#[cfg(not(feature = "wal-message-queue"))]
{
_ = c;
false
}
}
}
}
}
74 changes: 74 additions & 0 deletions src/wal/src/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use common_types::SequenceNumber;

use crate::{
log_batch::LogWriteBatch,
manager::{
BatchLogIteratorAdapter, ReadContext, ReadRequest, RegionId, Result, ScanContext,
ScanRequest, WalLocation, WalManager, WriteContext,
},
};

#[derive(Debug)]
pub struct DoNothing;

#[async_trait]
impl WalManager for DoNothing {
async fn sequence_num(&self, _location: WalLocation) -> Result<SequenceNumber> {
Ok(0)
}

async fn mark_delete_entries_up_to(
&self,
_location: WalLocation,
_sequence_num: SequenceNumber,
) -> Result<()> {
Ok(())
}

async fn close_region(&self, _region: RegionId) -> Result<()> {
Ok(())
}

async fn close_gracefully(&self) -> Result<()> {
Ok(())
}

async fn read_batch(
&self,
_ctx: &ReadContext,
_req: &ReadRequest,
) -> Result<BatchLogIteratorAdapter> {
Ok(BatchLogIteratorAdapter::empty())
}

async fn write(&self, _ctx: &WriteContext, _batch: &LogWriteBatch) -> Result<SequenceNumber> {
Ok(0)
}

async fn scan(
&self,
_ctx: &ScanContext,
_req: &ScanRequest,
) -> Result<BatchLogIteratorAdapter> {
Ok(BatchLogIteratorAdapter::empty())
}

async fn get_statistics(&self) -> Option<String> {
None
}
}
1 change: 1 addition & 0 deletions src/wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#![feature(trait_alias)]

pub mod config;
mod dummy;
pub mod kv_encoder;
pub mod log_batch;
pub mod manager;
Expand Down
7 changes: 7 additions & 0 deletions src/wal/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,13 @@ impl BatchLogIteratorAdapter {
}
}

pub fn empty() -> Self {
Self {
iter: None,
batch_size: 1,
}
}

async fn simulated_async_next<D, F>(
&mut self,
decoder: D,
Expand Down
2 changes: 2 additions & 0 deletions src/wal/src/message_queue_impl/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use time_ext::ReadableDuration;
pub struct KafkaStorageConfig {
/// Kafka client config
pub kafka: KafkaConfig,
/// If true, data wal will return Ok directly, without any IO operations.
pub disable_data: bool,
/// Namespace config for data.
pub data_namespace: KafkaWalConfig,
/// Namespace config for meta data
Expand Down
19 changes: 12 additions & 7 deletions src/wal/src/message_queue_impl/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,17 @@ impl WalsOpener for KafkaWalsOpener {
let kafka = KafkaImpl::new(kafka_wal_config.kafka.clone())
.await
.context(OpenKafka)?;
let data_wal = MessageQueueImpl::new(
WAL_DIR_NAME.to_string(),
kafka.clone(),
default_runtime.clone(),
kafka_wal_config.data_namespace,
);
let data_wal = if kafka_wal_config.disable_data {
Arc::new(crate::dummy::DoNothing) as Arc<_>
} else {
let data_wal = MessageQueueImpl::new(
WAL_DIR_NAME.to_string(),
kafka.clone(),
default_runtime.clone(),
kafka_wal_config.data_namespace,
);
Arc::new(data_wal) as Arc<_>
};

let manifest_wal = MessageQueueImpl::new(
MANIFEST_DIR_NAME.to_string(),
Expand All @@ -171,7 +176,7 @@ impl WalsOpener for KafkaWalsOpener {
);

Ok(OpenedWals {
data_wal: Arc::new(data_wal),
data_wal,
manifest_wal: Arc::new(manifest_wal),
})
}
Expand Down
3 changes: 3 additions & 0 deletions src/wal/src/rocksdb_impl/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use size_ext::ReadableSize;
pub struct RocksDBStorageConfig {
/// Data directory used by RocksDB.
pub data_dir: String,
/// If true, data wal will return Ok directly, without any IO operations.
pub disable_data: bool,
/// Namespace config for data.
pub data_namespace: RocksDBConfig,
/// Namespace config for meta.
Expand All @@ -32,6 +34,7 @@ impl Default for RocksDBStorageConfig {
fn default() -> Self {
Self {
data_dir: "/tmp/ceresdb".to_string(),
disable_data: false,
data_namespace: Default::default(),
meta_namespace: Default::default(),
}
Expand Down
116 changes: 49 additions & 67 deletions src/wal/src/rocksdb_impl/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ use runtime::Runtime;
use snafu::ResultExt;
use tokio::sync::Mutex;

use super::config::RocksDBConfig;
use crate::{
config::StorageConfig,
kv_encoder::{CommonLogEncoding, CommonLogKey, MaxSeqMetaEncoding, MaxSeqMetaValue, MetaKey},
log_batch::{LogEntry, LogWriteBatch},
manager::{
self, error::*, BatchLogIteratorAdapter, OpenedWals, ReadContext, ReadRequest, RegionId,
ScanContext, ScanRequest, SyncLogIterator, WalLocation, WalManager, WalRuntimes,
WalsOpener, WriteContext, MANIFEST_DIR_NAME, WAL_DIR_NAME,
ScanContext, ScanRequest, SyncLogIterator, WalLocation, WalManager, WalManagerRef,
WalRuntimes, WalsOpener, WriteContext, MANIFEST_DIR_NAME, WAL_DIR_NAME,
},
};

Expand Down Expand Up @@ -479,9 +480,9 @@ impl RocksImpl {
let meta_key = self.max_seq_meta_encoding.decode_key(iter.key())?;
let meta_value = self.max_seq_meta_encoding.decode_value(iter.value())?;
#[rustfmt::skip]
// FIXME: In some cases, the `flushed sequence`
// FIXME: In some cases, the `flushed sequence`
// may be greater than the `actual last sequence of written logs`.
//
//
// Such as following case:
// + Write wal logs failed(last sequence stored in memory will increase when write failed).
// + Get last sequence from memory(greater then actual last sequence now).
Expand All @@ -491,7 +492,7 @@ impl RocksImpl {
.and_modify(|v| {
if meta_value.max_seq > *v {
warn!(
"RocksDB WAL found flushed_seq greater than actual_last_sequence,
"RocksDB WAL found flushed_seq greater than actual_last_sequence,
flushed_sequence:{}, actual_last_sequence:{}, table_id:{}",
meta_value.max_seq, *v, meta_key.table_id
);
Expand Down Expand Up @@ -969,6 +970,28 @@ impl fmt::Debug for RocksImpl {
#[derive(Default)]
pub struct RocksDBWalsOpener;

impl RocksDBWalsOpener {
fn build_manager(
wal_path: PathBuf,
runtime: Arc<Runtime>,
config: RocksDBConfig,
) -> Result<WalManagerRef> {
let rocks = Builder::new(wal_path, runtime)
.max_subcompactions(config.max_subcompactions)
.max_background_jobs(config.max_background_jobs)
.enable_statistics(config.enable_statistics)
.write_buffer_size(config.write_buffer_size.0)
.max_write_buffer_number(config.max_write_buffer_number)
.level_zero_file_num_compaction_trigger(config.level_zero_file_num_compaction_trigger)
.level_zero_slowdown_writes_trigger(config.level_zero_slowdown_writes_trigger)
.level_zero_stop_writes_trigger(config.level_zero_stop_writes_trigger)
.fifo_compaction_max_table_files_size(config.fifo_compaction_max_table_files_size.0)
.build()?;

Ok(Arc::new(rocks))
}
}

#[async_trait]
impl WalsOpener for RocksDBWalsOpener {
async fn open_wals(&self, config: &StorageConfig, runtimes: WalRuntimes) -> Result<OpenedWals> {
Expand All @@ -986,69 +1009,28 @@ impl WalsOpener for RocksDBWalsOpener {

let write_runtime = runtimes.write_runtime.clone();
let data_path = Path::new(&rocksdb_wal_config.data_dir);
let wal_path = data_path.join(WAL_DIR_NAME);
let data_wal = Builder::new(wal_path, write_runtime.clone())
.max_subcompactions(rocksdb_wal_config.data_namespace.max_subcompactions)
.max_background_jobs(rocksdb_wal_config.data_namespace.max_background_jobs)
.enable_statistics(rocksdb_wal_config.data_namespace.enable_statistics)
.write_buffer_size(rocksdb_wal_config.data_namespace.write_buffer_size.0)
.max_write_buffer_number(rocksdb_wal_config.data_namespace.max_write_buffer_number)
.level_zero_file_num_compaction_trigger(
rocksdb_wal_config
.data_namespace
.level_zero_file_num_compaction_trigger,
)
.level_zero_slowdown_writes_trigger(
rocksdb_wal_config
.data_namespace
.level_zero_slowdown_writes_trigger,
)
.level_zero_stop_writes_trigger(
rocksdb_wal_config
.data_namespace
.level_zero_stop_writes_trigger,
)
.fifo_compaction_max_table_files_size(
rocksdb_wal_config
.data_namespace
.fifo_compaction_max_table_files_size
.0,
)
.build()?;

let manifest_path = data_path.join(MANIFEST_DIR_NAME);
let manifest_wal = Builder::new(manifest_path, write_runtime)
.max_subcompactions(rocksdb_wal_config.meta_namespace.max_subcompactions)
.max_background_jobs(rocksdb_wal_config.meta_namespace.max_background_jobs)
.enable_statistics(rocksdb_wal_config.meta_namespace.enable_statistics)
.write_buffer_size(rocksdb_wal_config.meta_namespace.write_buffer_size.0)
.max_write_buffer_number(rocksdb_wal_config.meta_namespace.max_write_buffer_number)
.level_zero_file_num_compaction_trigger(
rocksdb_wal_config
.meta_namespace
.level_zero_file_num_compaction_trigger,
)
.level_zero_slowdown_writes_trigger(
rocksdb_wal_config
.meta_namespace
.level_zero_slowdown_writes_trigger,
)
.level_zero_stop_writes_trigger(
rocksdb_wal_config
.meta_namespace
.level_zero_stop_writes_trigger,
)
.fifo_compaction_max_table_files_size(
rocksdb_wal_config
.meta_namespace
.fifo_compaction_max_table_files_size
.0,
)
.build()?;
let opened_wals = OpenedWals {
data_wal: Arc::new(data_wal),
manifest_wal: Arc::new(manifest_wal),
// Build data wal
let data_wal = if rocksdb_wal_config.disable_data {
Arc::new(crate::dummy::DoNothing)
} else {
Self::build_manager(
data_path.join(WAL_DIR_NAME),
write_runtime.clone(),
rocksdb_wal_config.data_namespace,
)?
};
Ok(opened_wals)

// Build manifest wal
let manifest_wal = Self::build_manager(
data_path.join(MANIFEST_DIR_NAME),
write_runtime,
rocksdb_wal_config.meta_namespace,
)?;

Ok(OpenedWals {
data_wal,
manifest_wal,
})
}
}
2 changes: 2 additions & 0 deletions src/wal/src/table_kv_impl/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use crate::table_kv_impl::model::NamespaceConfig;
pub struct ObkvStorageConfig {
/// Obkv client config
pub obkv: ObkvConfig,
/// If true, data wal will return Ok directly, without any IO operations.
pub disable_data: bool,
/// Namespace config for data.
pub data_namespace: WalNamespaceConfig,
/// Namespace config for meta data
Expand Down
Loading
Loading