Skip to content

Commit

Permalink
feat: support stack size of read threads configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Nov 10, 2023
1 parent d1f3349 commit d763083
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Default for WalEncodeConfig {
fn default() -> Self {
Self {
num_bytes_compress_threshold: ReadableSize::kb(1),
format: WalEncodeFormat::RowWise,
format: WalEncodeFormat::Columnar,
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions components/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ impl Builder {
self
}

/// Sets the size of the stack allocated to the worker threads the Runtime
/// will use.
///
/// This can be any number above 0.
pub fn stack_size(&mut self, val: usize) -> &mut Self {
self.builder.thread_stack_size(val);
self
}

/// Sets name of threads spawned by the Runtime thread pool
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
self.thread_name = val.into();
Expand Down
62 changes: 36 additions & 26 deletions src/ceresdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,48 @@ workspace = true
[features]
default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue"]
wal-table-kv = ["wal/wal-table-kv", "analytic_engine/wal-table-kv"]
wal-message-queue = ["wal/wal-message-queue", "analytic_engine/wal-message-queue"]
wal-message-queue = [
"wal/wal-message-queue",
"analytic_engine/wal-message-queue",
]
wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"]

[dependencies]
analytic_engine = { workspace = true }
catalog = { workspace = true }
catalog_impls = { workspace = true }
clap = { workspace = true }
cluster = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
logger = { workspace = true }
meta_client = { workspace = true }
moka = { version = "0.10", features = ["future"] }
panic_ext = { workspace = true }
proxy = { workspace = true }
query_engine = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
server = { workspace = true }
signal-hook = "0.3"
table_engine = { workspace = true }
toml = { workspace = true }
toml_ext = { workspace = true }
tracing_util = { workspace = true }
wal = { workspace = true }
catalog = { workspace = true }
catalog_impls = { workspace = true }
clap = { workspace = true }
cluster = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
logger = { workspace = true }
meta_client = { workspace = true }
moka = { version = "0.10", features = ["future"] }
panic_ext = { workspace = true }
proxy = { workspace = true }
query_engine = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
server = { workspace = true }
signal-hook = "0.3"
size_ext = { workspace = true }
table_engine = { workspace = true }
toml = { workspace = true }
toml_ext = { workspace = true }
tracing_util = { workspace = true }
wal = { workspace = true }

[build-dependencies]
vergen = { version = "8", default-features = false, features = ["build", "cargo", "git", "gitcl", "rustc"] }
vergen = { version = "8", default-features = false, features = [
"build",
"cargo",
"git",
"gitcl",
"rustc",
] }

[[bin]]
name = "ceresdb-server"
Expand Down
6 changes: 6 additions & 0 deletions src/ceresdb/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use cluster::config::ClusterConfig;
use proxy::limiter::LimiterConfig;
use serde::{Deserialize, Serialize};
use server::config::{ServerConfig, StaticRouteConfig};
use size_ext::ReadableSize;

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
Expand Down Expand Up @@ -92,6 +93,10 @@ pub enum ClusterDeployment {
pub struct RuntimeConfig {
/// Runtime for reading data
pub read_thread_num: usize,
/// The size of the stack used by the read thread
///
/// The size should be a set as a large number if the complex query exists.
pub read_thread_stack_size: ReadableSize,
/// Runtime for writing data
pub write_thread_num: usize,
/// Runtime for communicating with meta cluster
Expand All @@ -108,6 +113,7 @@ impl Default for RuntimeConfig {
fn default() -> Self {
Self {
read_thread_num: 8,
read_thread_stack_size: ReadableSize::mb(16),
write_thread_num: 8,
meta_thread_num: 2,
compact_thread_num: 4,
Expand Down
24 changes: 21 additions & 3 deletions src/ceresdb/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,36 @@ pub fn setup_tracing(config: &Config) -> WorkerGuard {
tracing_util::init_tracing_with_file(&config.tracing, &config.node.addr, Rotation::NEVER)
}

fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime {
runtime::Builder::default()
fn build_runtime_with_stack_size(
name: &str,
threads_num: usize,
stack_size: Option<usize>,
) -> runtime::Runtime {
let mut builder = runtime::Builder::default();

if let Some(stack_size) = stack_size {
builder.stack_size(stack_size);
}

builder
.worker_threads(threads_num)
.thread_name(name)
.enable_all()
.build()
.expect("Failed to create runtime")
}

fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime {
build_runtime_with_stack_size(name, threads_num, None)
}

fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
EngineRuntimes {
read_runtime: Arc::new(build_runtime("ceres-read", config.read_thread_num)),
read_runtime: Arc::new(build_runtime_with_stack_size(
"ceres-read",
config.read_thread_num,
Some(config.read_thread_stack_size.as_byte() as usize),
)),
write_runtime: Arc::new(build_runtime("ceres-write", config.write_thread_num)),
compact_runtime: Arc::new(build_runtime("ceres-compact", config.compact_thread_num)),
meta_runtime: Arc::new(build_runtime("ceres-meta", config.meta_thread_num)),
Expand Down

0 comments on commit d763083

Please sign in to comment.