Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support stack size of read threads configurable #1305

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions components/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ impl Builder {
self
}

/// Sets the size of the stack allocated to the worker threads the Runtime
/// will use.
///
/// This can be any number above 0.
pub fn stack_size(&mut self, val: usize) -> &mut Self {
self.builder.thread_stack_size(val);
self
}

/// Sets name of threads spawned by the Runtime thread pool
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
self.thread_name = val.into();
Expand Down
62 changes: 36 additions & 26 deletions src/ceresdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,48 @@ workspace = true
[features]
default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue"]
wal-table-kv = ["wal/wal-table-kv", "analytic_engine/wal-table-kv"]
wal-message-queue = ["wal/wal-message-queue", "analytic_engine/wal-message-queue"]
wal-message-queue = [
"wal/wal-message-queue",
"analytic_engine/wal-message-queue",
]
wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"]

[dependencies]
analytic_engine = { workspace = true }
catalog = { workspace = true }
catalog_impls = { workspace = true }
clap = { workspace = true }
cluster = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
logger = { workspace = true }
meta_client = { workspace = true }
moka = { version = "0.10", features = ["future"] }
panic_ext = { workspace = true }
proxy = { workspace = true }
query_engine = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
server = { workspace = true }
signal-hook = "0.3"
table_engine = { workspace = true }
toml = { workspace = true }
toml_ext = { workspace = true }
tracing_util = { workspace = true }
wal = { workspace = true }
catalog = { workspace = true }
catalog_impls = { workspace = true }
clap = { workspace = true }
cluster = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
logger = { workspace = true }
meta_client = { workspace = true }
moka = { version = "0.10", features = ["future"] }
panic_ext = { workspace = true }
proxy = { workspace = true }
query_engine = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
server = { workspace = true }
signal-hook = "0.3"
size_ext = { workspace = true }
table_engine = { workspace = true }
toml = { workspace = true }
toml_ext = { workspace = true }
tracing_util = { workspace = true }
wal = { workspace = true }

[build-dependencies]
vergen = { version = "8", default-features = false, features = ["build", "cargo", "git", "gitcl", "rustc"] }
vergen = { version = "8", default-features = false, features = [
"build",
"cargo",
"git",
"gitcl",
"rustc",
] }

[[bin]]
name = "ceresdb-server"
Expand Down
8 changes: 8 additions & 0 deletions src/ceresdb/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use cluster::config::ClusterConfig;
use proxy::limiter::LimiterConfig;
use serde::{Deserialize, Serialize};
use server::config::{ServerConfig, StaticRouteConfig};
use size_ext::ReadableSize;

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
Expand Down Expand Up @@ -92,6 +93,12 @@ pub enum ClusterDeployment {
pub struct RuntimeConfig {
/// Runtime for reading data
pub read_thread_num: usize,
/// The size of the stack used by the read thread
///
/// The size should be a set as a large number if the complex query exists.
/// TODO: this config may be removed in the future when the complex query
/// won't overflow the stack.
pub read_thread_stack_size: ReadableSize,
/// Runtime for writing data
pub write_thread_num: usize,
/// Runtime for communicating with meta cluster
Expand All @@ -108,6 +115,7 @@ impl Default for RuntimeConfig {
fn default() -> Self {
Self {
read_thread_num: 8,
read_thread_stack_size: ReadableSize::mb(16),
write_thread_num: 8,
meta_thread_num: 2,
compact_thread_num: 4,
Expand Down
24 changes: 21 additions & 3 deletions src/ceresdb/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,36 @@ pub fn setup_tracing(config: &Config) -> WorkerGuard {
tracing_util::init_tracing_with_file(&config.tracing, &config.node.addr, Rotation::NEVER)
}

fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime {
runtime::Builder::default()
fn build_runtime_with_stack_size(
name: &str,
threads_num: usize,
stack_size: Option<usize>,
) -> runtime::Runtime {
let mut builder = runtime::Builder::default();

if let Some(stack_size) = stack_size {
builder.stack_size(stack_size);
}

builder
.worker_threads(threads_num)
.thread_name(name)
.enable_all()
.build()
.expect("Failed to create runtime")
}

fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime {
build_runtime_with_stack_size(name, threads_num, None)
}

fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
EngineRuntimes {
read_runtime: Arc::new(build_runtime("ceres-read", config.read_thread_num)),
read_runtime: Arc::new(build_runtime_with_stack_size(
"ceres-read",
config.read_thread_num,
Some(config.read_thread_stack_size.as_byte() as usize),
)),
write_runtime: Arc::new(build_runtime("ceres-write", config.write_thread_num)),
compact_runtime: Arc::new(build_runtime("ceres-compact", config.compact_thread_num)),
meta_runtime: Arc::new(build_runtime("ceres-meta", config.meta_thread_num)),
Expand Down
Loading