Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,4 @@ features = [
"broadcast",
"ondemand",
]

3 changes: 3 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,6 @@ pretty_assertions.workspace = true
jsonwebtoken.workspace = true
axum.workspace = true
reqwest.workspace = true

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
101 changes: 101 additions & 0 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,46 @@ metrics_group!(
#[labels(node_id: str)]
pub jemalloc_resident_bytes: IntGaugeVec,

#[name = tokio_num_workers]
#[help = "Number of core tokio workers"]
#[labels(node_id: str)]
pub tokio_num_workers: IntGaugeVec,

#[name = tokio_num_blocking_threads]
#[help = "Number of extra tokio threads for blocking tasks"]
#[labels(node_id: str)]
pub tokio_num_blocking_threads: IntGaugeVec,

#[name = tokio_num_idle_blocking_threads]
#[help = "Number of tokio blocking threads that are idle"]
#[labels(node_id: str)]
pub tokio_num_idle_blocking_threads: IntGaugeVec,

#[name = tokio_num_alive_tasks]
#[help = "Number of tokio tasks that are still alive"]
#[labels(node_id: str)]
pub tokio_num_alive_tasks: IntGaugeVec,

#[name = tokio_global_queue_depth]
#[help = "Number of tasks in tokios global queue"]
#[labels(node_id: str)]
pub tokio_global_queue_depth: IntGaugeVec,

#[name = tokio_blocking_queue_depth]
#[help = "Number of tasks in tokios blocking task queue"]
#[labels(node_id: str)]
pub tokio_blocking_queue_depth: IntGaugeVec,

#[name = tokio_spawned_tasks_count]
#[help = "Number of tokio tasks spawned"]
#[labels(node_id: str)]
pub tokio_spawned_tasks_count: IntCounterVec,

#[name = tokio_remote_schedule_count]
#[help = "Number of tasks spawned from outside the tokio runtime"]
#[labels(node_id: str)]
pub tokio_remote_schedule_count: IntCounterVec,

#[name = spacetime_websocket_sent_msg_size_bytes]
#[help = "The size of messages sent to connected sessions"]
#[labels(db: Identity, workload: WorkloadType)]
Expand Down Expand Up @@ -165,3 +205,64 @@ pub fn spawn_jemalloc_stats(node_id: String) {
});
});
}

// How frequently to update the tokio stats.
const TOKIO_STATS_INTERVAL: Duration = Duration::from_secs(10);
static SPAWN_TOKIO_STATS_GUARD: Once = Once::new();
pub fn spawn_tokio_stats(node_id: String) {
SPAWN_TOKIO_STATS_GUARD.call_once(|| {
spawn(async move {
// Set up our metric handles, so we don't keep calling `with_label_values`.
let num_worker_metric = WORKER_METRICS.tokio_num_workers.with_label_values(&node_id);
let num_blocking_threads_metric = WORKER_METRICS.tokio_num_blocking_threads.with_label_values(&node_id);
let num_alive_tasks_metric = WORKER_METRICS.tokio_num_alive_tasks.with_label_values(&node_id);
let global_queue_depth_metric = WORKER_METRICS.tokio_global_queue_depth.with_label_values(&node_id);
let num_idle_blocking_threads_metric = WORKER_METRICS
.tokio_num_idle_blocking_threads
.with_label_values(&node_id);
let blocking_queue_depth_metric = WORKER_METRICS.tokio_blocking_queue_depth.with_label_values(&node_id);
let spawned_tasks_count_metric = WORKER_METRICS.tokio_spawned_tasks_count.with_label_values(&node_id);
let remote_schedule_count_metric = WORKER_METRICS.tokio_remote_schedule_count.with_label_values(&node_id);
loop {
let metrics = tokio::runtime::Handle::current().metrics();

num_worker_metric.set(metrics.num_workers() as i64);
num_alive_tasks_metric.set(metrics.num_alive_tasks() as i64);
global_queue_depth_metric.set(metrics.global_queue_depth() as i64);
#[cfg(tokio_unstable)]
{
log::info!("Has unstable metrics");
num_blocking_threads_metric.set(metrics.num_blocking_threads() as i64);
num_idle_blocking_threads_metric.set(metrics.num_idle_blocking_threads() as i64);
blocking_queue_depth_metric.set(metrics.blocking_queue_depth() as i64);
}

log::info!("after unstable metrics");
// The spawned tasks count and remote schedule count are cumulative,
// so we need to increment them by the difference from the last value.
#[cfg(all(target_has_atomic = "64", tokio_unstable))]
{
{
let current_count = metrics.spawned_tasks_count();
let previous_value = spawned_tasks_count_metric.get();
// The tokio metric should be monotonically increasing, but we are checking just in case.
if let Some(diff) = current_count.checked_sub(previous_value) {
spawned_tasks_count_metric.inc_by(diff);
}
}
{
let current_count = metrics.remote_schedule_count();
let previous_value = remote_schedule_count_metric.get();
// The tokio metric should be monotonically increasing, but we are checking just in case.
if let Some(diff) = current_count.checked_sub(previous_value) {
remote_schedule_count_metric.inc_by(diff);
}
}
}
#[cfg(target_has_atomic = "64")]
// TODO: Consider adding some of the worker metrics as well, like overflows, steals, etc.
sleep(TOKIO_STATS_INTERVAL).await;
}
});
});
}
1 change: 1 addition & 0 deletions crates/standalone/src/subcommands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> {
let data_dir = Arc::new(data_dir.clone());
let ctx = StandaloneEnv::init(db_config, &certs, data_dir).await?;
worker_metrics::spawn_jemalloc_stats(listen_addr.clone());
worker_metrics::spawn_tokio_stats(listen_addr.clone());

let mut db_routes = DatabaseRoutes::default();
db_routes.root_post = db_routes.root_post.layer(DefaultBodyLimit::disable());
Expand Down
Loading