Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ termcolor = "1.2.0"
thin-vec = "0.2.13"
thiserror = "1.0.37"
tokio = { version = "1.37", features = ["full"] }
tokio_metrics = { version = "0.4.0" }
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
tokio-stream = "0.1.17"
tokio-tungstenite = { version = "0.26.2", features = ["native-tls"] }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ thin-vec.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tokio-stream = "0.1"
tokio-metrics = { version = "0.4.0", features = ["rt"] }
toml.workspace = true
toml_edit.workspace = true
tracing-appender.workspace = true
Expand Down
147 changes: 123 additions & 24 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use crate::execution_context::WorkloadType;
use crate::hash::Hash;
use once_cell::sync::Lazy;
use prometheus::{HistogramVec, IntCounterVec, IntGaugeVec};
use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
use spacetimedb_lib::{ConnectionId, Identity};
use spacetimedb_metrics::metrics_group;

Expand Down Expand Up @@ -79,6 +79,61 @@ metrics_group!(
#[labels(node_id: str)]
pub tokio_remote_schedule_count: IntCounterVec,

#[name = tokio_local_queue_depth_total]
#[help = "Total size of all tokio workers local queues"]
#[labels(node_id: str)]
pub tokio_local_queue_depth_total: IntGaugeVec,

#[name = tokio_local_queue_depth_max]
#[help = "Length of the longest tokio worker local queue"]
#[labels(node_id: str)]
pub tokio_local_queue_depth_max: IntGaugeVec,

#[name = tokio_local_queue_depth_min]
#[help = "Length of the shortest tokio worker local queue"]
#[labels(node_id: str)]
pub tokio_local_queue_depth_min: IntGaugeVec,

#[name = tokio_steal_total]
#[help = "Total number of tasks stolen from other workers"]
#[labels(node_id: str)]
pub tokio_steal_total: IntCounterVec,

#[name = tokio_steal_operations_total]
#[help = "Total number of times a worker tried to steal a chunk of tasks"]
#[labels(node_id: str)]
pub tokio_steal_operations_total: IntCounterVec,

#[name = tokio_local_schedule_total]
#[help = "Total number of tasks scheduled from worker threads"]
#[labels(node_id: str)]
pub tokio_local_schedule_total: IntCounterVec,

#[name = tokio_overflow_total]
#[help = "Total number of times a tokio worker overflowed its local queue"]
#[labels(node_id: str)]
pub tokio_overflow_total: IntCounterVec,

#[name = tokio_busy_ratio_min]
#[help = "Busy ratio of the least busy tokio worker"]
#[labels(node_id: str)]
pub tokio_busy_ratio_min: GaugeVec,

#[name = tokio_busy_ratio_max]
#[help = "Busy ratio of the most busy tokio worker"]
#[labels(node_id: str)]
pub tokio_busy_ratio_max: GaugeVec,

#[name = tokio_busy_ratio_avg]
#[help = "Avg busy ratio of tokio workers"]
#[labels(node_id: str)]
pub tokio_busy_ratio_avg: GaugeVec,

#[name = tokio_mean_polls_per_park]
#[help = "Number of tasks polls divided by the times an idle worker was parked"]
#[labels(node_id: str)]
pub tokio_mean_polls_per_park: GaugeVec,

#[name = spacetime_websocket_sent_msg_size_bytes]
#[help = "The size of messages sent to connected sessions"]
#[labels(db: Identity, workload: WorkloadType)]
Expand Down Expand Up @@ -207,9 +262,18 @@ pub fn spawn_jemalloc_stats(node_id: String) {
}

// How frequently to update the tokio stats.
#[cfg(all(target_has_atomic = "64", tokio_unstable))]
const TOKIO_STATS_INTERVAL: Duration = Duration::from_secs(10);
#[cfg(all(target_has_atomic = "64", tokio_unstable))]
static SPAWN_TOKIO_STATS_GUARD: Once = Once::new();
pub fn spawn_tokio_stats(node_id: String) {
// Some of these metrics could still be reported without these settings,
// but it is simpler to just skip all the tokio metrics if they aren't set.

#[cfg(not(all(target_has_atomic = "64", tokio_unstable)))]
log::warn!("Skipping tokio metrics for {node_id}, as they are not enabled in this build.");

#[cfg(all(target_has_atomic = "64", tokio_unstable))]
SPAWN_TOKIO_STATS_GUARD.call_once(|| {
spawn(async move {
// Set up our metric handles, so we don't keep calling `with_label_values`.
Expand All @@ -223,42 +287,77 @@ pub fn spawn_tokio_stats(node_id: String) {
let blocking_queue_depth_metric = WORKER_METRICS.tokio_blocking_queue_depth.with_label_values(&node_id);
let spawned_tasks_count_metric = WORKER_METRICS.tokio_spawned_tasks_count.with_label_values(&node_id);
let remote_schedule_count_metric = WORKER_METRICS.tokio_remote_schedule_count.with_label_values(&node_id);

let local_queue_depth_total_metric =
WORKER_METRICS.tokio_local_queue_depth_total.with_label_values(&node_id);
let local_queue_depth_max_metric = WORKER_METRICS.tokio_local_queue_depth_max.with_label_values(&node_id);
let local_queue_depth_min_metric = WORKER_METRICS.tokio_local_queue_depth_min.with_label_values(&node_id);
let steal_total_metric = WORKER_METRICS.tokio_steal_total.with_label_values(&node_id);
let steal_operations_total_metric = WORKER_METRICS.tokio_steal_operations_total.with_label_values(&node_id);
let local_schedule_total_metric = WORKER_METRICS.tokio_local_schedule_total.with_label_values(&node_id);
let overflow_total_metric = WORKER_METRICS.tokio_overflow_total.with_label_values(&node_id);
let busy_ratio_min_metric = WORKER_METRICS.tokio_busy_ratio_min.with_label_values(&node_id);
let busy_ratio_max_metric = WORKER_METRICS.tokio_busy_ratio_max.with_label_values(&node_id);
let busy_ratio_avg_metric = WORKER_METRICS.tokio_busy_ratio_avg.with_label_values(&node_id);
let mean_polls_per_park_metric = WORKER_METRICS.tokio_mean_polls_per_park.with_label_values(&node_id);

let handle = tokio::runtime::Handle::current();
// The tokio_metrics library gives us some helpers for aggregating per-worker metrics.
let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
let mut intervals = runtime_monitor.intervals();
loop {
let metrics = tokio::runtime::Handle::current().metrics();
let interval_delta = intervals.next();

num_worker_metric.set(metrics.num_workers() as i64);
num_alive_tasks_metric.set(metrics.num_alive_tasks() as i64);
global_queue_depth_metric.set(metrics.global_queue_depth() as i64);
#[cfg(tokio_unstable)]
{
num_blocking_threads_metric.set(metrics.num_blocking_threads() as i64);
num_idle_blocking_threads_metric.set(metrics.num_idle_blocking_threads() as i64);
blocking_queue_depth_metric.set(metrics.blocking_queue_depth() as i64);
}
num_blocking_threads_metric.set(metrics.num_blocking_threads() as i64);
num_idle_blocking_threads_metric.set(metrics.num_idle_blocking_threads() as i64);
blocking_queue_depth_metric.set(metrics.blocking_queue_depth() as i64);

// The spawned tasks count and remote schedule count are cumulative,
// so we need to increment them by the difference from the last value.
#[cfg(all(target_has_atomic = "64", tokio_unstable))]
{
{
let current_count = metrics.spawned_tasks_count();
let previous_value = spawned_tasks_count_metric.get();
// The tokio metric should be monotonically increasing, but we are checking just in case.
if let Some(diff) = current_count.checked_sub(previous_value) {
spawned_tasks_count_metric.inc_by(diff);
}
let current_count = metrics.spawned_tasks_count();
let previous_value = spawned_tasks_count_metric.get();
// The tokio metric should be monotonically increasing, but we are checking just in case.
if let Some(diff) = current_count.checked_sub(previous_value) {
spawned_tasks_count_metric.inc_by(diff);
}
}
{
let current_count = metrics.remote_schedule_count();
let previous_value = remote_schedule_count_metric.get();
// The tokio metric should be monotonically increasing, but we are checking just in case.
if let Some(diff) = current_count.checked_sub(previous_value) {
remote_schedule_count_metric.inc_by(diff);
}
{
let current_count = metrics.remote_schedule_count();
let previous_value = remote_schedule_count_metric.get();
// The tokio metric should be monotonically increasing, but we are checking just in case.
if let Some(diff) = current_count.checked_sub(previous_value) {
remote_schedule_count_metric.inc_by(diff);
}
}

if let Some(interval_delta) = interval_delta {
local_queue_depth_total_metric.set(interval_delta.total_local_queue_depth as i64);
local_queue_depth_max_metric.set(interval_delta.max_local_queue_depth as i64);
local_queue_depth_min_metric.set(interval_delta.min_local_queue_depth as i64);
steal_total_metric.inc_by(interval_delta.total_steal_count);
steal_operations_total_metric.inc_by(interval_delta.total_steal_operations);
local_schedule_total_metric.inc_by(interval_delta.total_local_schedule_count);
overflow_total_metric.inc_by(interval_delta.total_overflow_count);
mean_polls_per_park_metric.set(interval_delta.mean_polls_per_park());

// This is mostly to make sure we don't divide by zero, but we also want to skip the first interval if it is very short.
if interval_delta.elapsed.as_millis() > 100 {
busy_ratio_avg_metric.set(interval_delta.busy_ratio());
busy_ratio_min_metric.set(
interval_delta.min_busy_duration.as_nanos() as f64
/ interval_delta.elapsed.as_nanos() as f64,
);
busy_ratio_max_metric.set(
interval_delta.max_busy_duration.as_nanos() as f64
/ interval_delta.elapsed.as_nanos() as f64,
);
}
}
#[cfg(target_has_atomic = "64")]
// TODO: Consider adding some of the worker metrics as well, like overflows, steals, etc.
sleep(TOKIO_STATS_INTERVAL).await;
}
});
Expand Down
Loading