Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl DoNothing wal #1311

Merged
merged 5 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use object_store::config::StorageOptions;
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
use time_ext::ReadableDuration;
use wal::config::StorageConfig;
use wal::config::Config as WalConfig;

pub use crate::{compaction::scheduler::SchedulerConfig, table_options::TableOptions};

Expand Down Expand Up @@ -112,7 +112,7 @@ pub struct Config {
/// + RocksDB
/// + OBKV
/// + Kafka
pub wal: StorageConfig,
pub wal: WalConfig,

/// Recover mode
///
Expand Down Expand Up @@ -188,7 +188,7 @@ impl Default for Config {
max_bytes_per_write_batch: None,
mem_usage_sampling_interval: ReadableDuration::secs(0),
wal_encode: WalEncodeConfig::default(),
wal: StorageConfig::RocksDB(Box::default()),
wal: WalConfig::default(),
remote_engine_client: remote_engine_client::config::Config::default(),
recover_mode: RecoverMode::TableBased,
metrics: MetricsOptions::default(),
Expand Down
42 changes: 26 additions & 16 deletions analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use table_engine::{
use tempfile::TempDir;
use time_ext::ReadableDuration;
use wal::{
config::StorageConfig,
config::{Config as WalConfig, StorageConfig},
manager::{OpenedWals, WalRuntimes, WalsOpener},
rocksdb_impl::{config::RocksDBStorageConfig, manager::RocksDBWalsOpener},
table_kv_impl::wal::MemWalsOpener,
Expand Down Expand Up @@ -506,10 +506,13 @@ impl Builder {
data_dir: dir.path().to_str().unwrap().to_string(),
}),
},
wal: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})),
wal: WalConfig {
storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})),
disable_data: false,
},
..Default::default()
};

Expand Down Expand Up @@ -581,11 +584,13 @@ impl Default for RocksDBEngineBuildContext {
data_dir: dir.path().to_str().unwrap().to_string(),
}),
},

wal: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})),
wal: WalConfig {
storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})),
disable_data: false,
},
..Default::default()
};

Expand Down Expand Up @@ -614,11 +619,13 @@ impl Clone for RocksDBEngineBuildContext {
};

config.storage = storage;
config.wal = StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
}));

config.wal = WalConfig {
storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
data_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})),
disable_data: false,
};
Self {
config,
open_method: self.open_method,
Expand Down Expand Up @@ -674,7 +681,10 @@ impl Default for MemoryEngineBuildContext {
data_dir: dir.path().to_str().unwrap().to_string(),
}),
},
wal: StorageConfig::Obkv(Box::default()),
wal: WalConfig {
storage: StorageConfig::Obkv(Box::default()),
disable_data: false,
},
..Default::default()
};

Expand Down
14 changes: 13 additions & 1 deletion src/ceresdb/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
}
}

fn validate_config(config: &Config) {
let is_data_wal_disabled = config.analytic.wal.disable_data;
if is_data_wal_disabled {
let is_cluster = config.cluster_deployment.is_some();
if !is_cluster {
panic!("Invalid config, we can only disable data wal in cluster deployments")
}
}
}

/// Run a server, returns when the server is shutdown by user
pub fn run_server(config: Config, log_runtime: RuntimeLevel) {
let runtimes = Arc::new(build_engine_runtimes(&config.runtime));
Expand All @@ -108,8 +118,10 @@ pub fn run_server(config: Config, log_runtime: RuntimeLevel) {

info!("Server starts up, config:{:#?}", config);

validate_config(&config);

runtimes.default_runtime.block_on(async {
match config.analytic.wal {
match config.analytic.wal.storage {
StorageConfig::RocksDB(_) => {
#[cfg(feature = "wal-rocksdb")]
{
Expand Down
21 changes: 21 additions & 0 deletions src/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,27 @@ pub type KafkaStorageConfig = crate::message_queue_impl::config::KafkaStorageCon
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct KafkaStorageConfig;

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Config {
// The flatten attribute inlines keys from a field into the parent struct.
// That's to say `storage` has no real usage, it's just a placeholder.
#[serde(flatten)]
pub storage: StorageConfig,
/// If true, data wal will return Ok directly, without any IO operations.
// Note: this is only used for test, we shouldn't enable this in production.
#[serde(default)]
pub disable_data: bool,
}

impl Default for Config {
fn default() -> Self {
Self {
storage: StorageConfig::RocksDB(Box::default()),
disable_data: false,
}
}
}

/// Options for wal storage backend
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type")]
Expand Down
74 changes: 74 additions & 0 deletions src/wal/src/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use common_types::SequenceNumber;

use crate::{
log_batch::LogWriteBatch,
manager::{
BatchLogIteratorAdapter, ReadContext, ReadRequest, RegionId, Result, ScanContext,
ScanRequest, WalLocation, WalManager, WriteContext,
},
};

#[derive(Debug)]
pub struct DoNothing;

#[async_trait]
impl WalManager for DoNothing {
async fn sequence_num(&self, _location: WalLocation) -> Result<SequenceNumber> {
Ok(0)
}

async fn mark_delete_entries_up_to(
&self,
_location: WalLocation,
_sequence_num: SequenceNumber,
) -> Result<()> {
Ok(())
}

async fn close_region(&self, _region: RegionId) -> Result<()> {
Ok(())
}

async fn close_gracefully(&self) -> Result<()> {
Ok(())
}

async fn read_batch(
&self,
_ctx: &ReadContext,
_req: &ReadRequest,
) -> Result<BatchLogIteratorAdapter> {
Ok(BatchLogIteratorAdapter::empty())
}

async fn write(&self, _ctx: &WriteContext, _batch: &LogWriteBatch) -> Result<SequenceNumber> {
Ok(0)
}

async fn scan(
&self,
_ctx: &ScanContext,
_req: &ScanRequest,
) -> Result<BatchLogIteratorAdapter> {
Ok(BatchLogIteratorAdapter::empty())
}

async fn get_statistics(&self) -> Option<String> {
None
}
}
1 change: 1 addition & 0 deletions src/wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#![feature(trait_alias)]

pub mod config;
mod dummy;
pub mod kv_encoder;
pub mod log_batch;
pub mod manager;
Expand Down
11 changes: 9 additions & 2 deletions src/wal/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use runtime::Runtime;
use snafu::ResultExt;

use crate::{
config::StorageConfig,
config::Config,
log_batch::{LogEntry, LogWriteBatch, PayloadDecodeContext, PayloadDecoder},
metrics::WAL_WRITE_BYTES_HISTOGRAM,
};
Expand Down Expand Up @@ -400,6 +400,13 @@ impl BatchLogIteratorAdapter {
}
}

pub fn empty() -> Self {
Self {
iter: None,
batch_size: 1,
}
}

async fn simulated_async_next<D, F>(
&mut self,
decoder: D,
Expand Down Expand Up @@ -549,7 +556,7 @@ pub(crate) const MANIFEST_DIR_NAME: &str = "manifest";

#[async_trait]
pub trait WalsOpener: Send + Sync + Default {
async fn open_wals(&self, config: &StorageConfig, runtimes: WalRuntimes) -> Result<OpenedWals>;
async fn open_wals(&self, config: &Config, runtimes: WalRuntimes) -> Result<OpenedWals>;
}

#[cfg(test)]
Expand Down
25 changes: 15 additions & 10 deletions src/wal/src/message_queue_impl/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use runtime::Runtime;
use snafu::ResultExt;

use crate::{
config::StorageConfig,
config::{Config, StorageConfig},
log_batch::{LogEntry, LogWriteBatch},
manager::{
self, error::*, AsyncLogIterator, BatchLogIteratorAdapter, OpenedWals, ReadContext,
Expand Down Expand Up @@ -138,8 +138,8 @@ pub struct KafkaWalsOpener;

#[async_trait]
impl WalsOpener for KafkaWalsOpener {
async fn open_wals(&self, config: &StorageConfig, runtimes: WalRuntimes) -> Result<OpenedWals> {
let kafka_wal_config = match config {
async fn open_wals(&self, config: &Config, runtimes: WalRuntimes) -> Result<OpenedWals> {
let kafka_wal_config = match &config.storage {
StorageConfig::Kafka(config) => config.clone(),
_ => {
return InvalidWalConfig {
Expand All @@ -156,12 +156,17 @@ impl WalsOpener for KafkaWalsOpener {
let kafka = KafkaImpl::new(kafka_wal_config.kafka.clone())
.await
.context(OpenKafka)?;
let data_wal = MessageQueueImpl::new(
WAL_DIR_NAME.to_string(),
kafka.clone(),
default_runtime.clone(),
kafka_wal_config.data_namespace,
);
let data_wal = if config.disable_data {
Arc::new(crate::dummy::DoNothing) as Arc<_>
} else {
let data_wal = MessageQueueImpl::new(
WAL_DIR_NAME.to_string(),
kafka.clone(),
default_runtime.clone(),
kafka_wal_config.data_namespace,
);
Arc::new(data_wal) as Arc<_>
};

let manifest_wal = MessageQueueImpl::new(
MANIFEST_DIR_NAME.to_string(),
Expand All @@ -171,7 +176,7 @@ impl WalsOpener for KafkaWalsOpener {
);

Ok(OpenedWals {
data_wal: Arc::new(data_wal),
data_wal,
manifest_wal: Arc::new(manifest_wal),
})
}
Expand Down
Loading
Loading