Skip to content

Commit

Permalink
feat: support stack size of read threads configurable (#1305)
Browse files Browse the repository at this point in the history
## Rationale
Complex sqls, e.g. sql with massive condition in the where clause, may
lead to stack overflow during query procedure. And currently, there is
no a good to handle it in the query engine powered by datafusion so a
simple way is just to enlarge the stack for the read threads.

## Detailed Changes
- Support the stack size of read threads confiugrable
- Make the columnar format as the default wal payload

## Test Plan
All the tests in the ci should pass.
  • Loading branch information
ShiKaiWi authored Nov 15, 2023
1 parent 7ad3be9 commit 312c6a3
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions components/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ impl Builder {
self
}

/// Sets the size of the stack allocated to the worker threads the Runtime
/// will use.
///
/// This can be any number above 0.
pub fn stack_size(&mut self, val: usize) -> &mut Self {
self.builder.thread_stack_size(val);
self
}

/// Sets name of threads spawned by the Runtime thread pool
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
self.thread_name = val.into();
Expand Down
62 changes: 36 additions & 26 deletions src/ceresdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,48 @@ workspace = true
[features]
default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue"]
wal-table-kv = ["wal/wal-table-kv", "analytic_engine/wal-table-kv"]
wal-message-queue = ["wal/wal-message-queue", "analytic_engine/wal-message-queue"]
wal-message-queue = [
"wal/wal-message-queue",
"analytic_engine/wal-message-queue",
]
wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"]

[dependencies]
analytic_engine = { workspace = true }
catalog = { workspace = true }
catalog_impls = { workspace = true }
clap = { workspace = true }
cluster = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
logger = { workspace = true }
meta_client = { workspace = true }
moka = { version = "0.10", features = ["future"] }
panic_ext = { workspace = true }
proxy = { workspace = true }
query_engine = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
server = { workspace = true }
signal-hook = "0.3"
table_engine = { workspace = true }
toml = { workspace = true }
toml_ext = { workspace = true }
tracing_util = { workspace = true }
wal = { workspace = true }
catalog = { workspace = true }
catalog_impls = { workspace = true }
clap = { workspace = true }
cluster = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
logger = { workspace = true }
meta_client = { workspace = true }
moka = { version = "0.10", features = ["future"] }
panic_ext = { workspace = true }
proxy = { workspace = true }
query_engine = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
server = { workspace = true }
signal-hook = "0.3"
size_ext = { workspace = true }
table_engine = { workspace = true }
toml = { workspace = true }
toml_ext = { workspace = true }
tracing_util = { workspace = true }
wal = { workspace = true }

[build-dependencies]
vergen = { version = "8", default-features = false, features = ["build", "cargo", "git", "gitcl", "rustc"] }
vergen = { version = "8", default-features = false, features = [
"build",
"cargo",
"git",
"gitcl",
"rustc",
] }

[[bin]]
name = "ceresdb-server"
Expand Down
8 changes: 8 additions & 0 deletions src/ceresdb/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use cluster::config::ClusterConfig;
use proxy::limiter::LimiterConfig;
use serde::{Deserialize, Serialize};
use server::config::{ServerConfig, StaticRouteConfig};
use size_ext::ReadableSize;

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
Expand Down Expand Up @@ -92,6 +93,12 @@ pub enum ClusterDeployment {
pub struct RuntimeConfig {
/// Runtime for reading data
pub read_thread_num: usize,
/// The size of the stack used by the read thread
///
/// The size should be a set as a large number if the complex query exists.
/// TODO: this config may be removed in the future when the complex query
/// won't overflow the stack.
pub read_thread_stack_size: ReadableSize,
/// Runtime for writing data
pub write_thread_num: usize,
/// Runtime for communicating with meta cluster
Expand All @@ -108,6 +115,7 @@ impl Default for RuntimeConfig {
fn default() -> Self {
Self {
read_thread_num: 8,
read_thread_stack_size: ReadableSize::mb(16),
write_thread_num: 8,
meta_thread_num: 2,
compact_thread_num: 4,
Expand Down
24 changes: 21 additions & 3 deletions src/ceresdb/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,36 @@ pub fn setup_tracing(config: &Config) -> WorkerGuard {
tracing_util::init_tracing_with_file(&config.tracing, &config.node.addr, Rotation::NEVER)
}

fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime {
runtime::Builder::default()
fn build_runtime_with_stack_size(
name: &str,
threads_num: usize,
stack_size: Option<usize>,
) -> runtime::Runtime {
let mut builder = runtime::Builder::default();

if let Some(stack_size) = stack_size {
builder.stack_size(stack_size);
}

builder
.worker_threads(threads_num)
.thread_name(name)
.enable_all()
.build()
.expect("Failed to create runtime")
}

fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime {
build_runtime_with_stack_size(name, threads_num, None)
}

fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
EngineRuntimes {
read_runtime: Arc::new(build_runtime("ceres-read", config.read_thread_num)),
read_runtime: Arc::new(build_runtime_with_stack_size(
"ceres-read",
config.read_thread_num,
Some(config.read_thread_stack_size.as_byte() as usize),
)),
write_runtime: Arc::new(build_runtime("ceres-write", config.write_thread_num)),
compact_runtime: Arc::new(build_runtime("ceres-compact", config.compact_thread_num)),
meta_runtime: Arc::new(build_runtime("ceres-meta", config.meta_thread_num)),
Expand Down

0 comments on commit 312c6a3

Please sign in to comment.