Skip to content

Commit

Permalink
stabilize worker total busy duration, bring WorkerMetrics, MetricsBat…
Browse files Browse the repository at this point in the history
…ch and Histogram out of unstable flag
  • Loading branch information
Owen-CH-Leung committed Oct 11, 2024
1 parent 9cc4a81 commit 8f1fcb4
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 177 deletions.
27 changes: 15 additions & 12 deletions tokio/src/runtime/metrics/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,34 @@ pub(crate) struct HistogramBatch {
resolution: u64,
}

cfg_unstable! {
/// Whether the histogram used to aggregate a metric uses a linear or
/// logarithmic scale.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[non_exhaustive]
pub enum HistogramScale {
/// Linear bucket scale
Linear,

/// Logarithmic bucket scale
Log,
}
/// Whether the histogram used to aggregate a metric uses a linear or
/// logarithmic scale.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[non_exhaustive]
#[allow(unreachable_pub)]
pub enum HistogramScale {
/// Linear bucket scale
Linear,

/// Logarithmic bucket scale
#[allow(dead_code)]
Log,
}

impl Histogram {
#[allow(dead_code)]
pub(crate) fn num_buckets(&self) -> usize {
self.buckets.len()
}

cfg_64bit_metrics! {
#[allow(dead_code)]
pub(crate) fn get(&self, bucket: usize) -> u64 {
self.buckets[bucket].load(Relaxed)
}
}

#[allow(dead_code)]
pub(crate) fn bucket_range(&self, bucket: usize) -> Range<u64> {
match self.scale {
HistogramScale::Log => Range {
Expand Down
46 changes: 0 additions & 46 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
//! This file contains mocks of the types in src/runtime/metrics

use std::thread::ThreadId;

pub(crate) struct SchedulerMetrics {}

pub(crate) struct WorkerMetrics {}

pub(crate) struct MetricsBatch {}

#[derive(Clone, Default)]
pub(crate) struct HistogramBuilder {}

impl SchedulerMetrics {
pub(crate) fn new() -> Self {
Self {}
Expand All @@ -20,40 +11,3 @@ impl SchedulerMetrics {
pub(crate) fn inc_remote_schedule_count(&self) {}
}

impl WorkerMetrics {
pub(crate) fn new() -> Self {
Self {}
}

pub(crate) fn from_config(config: &crate::runtime::Config) -> Self {
// Prevent the dead-code warning from being triggered
let _ = &config.metrics_poll_count_histogram;
Self::new()
}

pub(crate) fn set_queue_depth(&self, _len: usize) {}
pub(crate) fn set_thread_id(&self, _thread_id: ThreadId) {}
}

impl MetricsBatch {
pub(crate) fn new(_: &WorkerMetrics) -> Self {
Self {}
}

pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn unparked(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {}
pub(crate) fn start_poll(&mut self) {}
pub(crate) fn end_poll(&mut self) {}
}

cfg_rt_multi_thread! {
impl MetricsBatch {
pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
pub(crate) fn incr_steal_operations(&mut self) {}
pub(crate) fn incr_overflow_count(&mut self) {}
}
}
19 changes: 10 additions & 9 deletions tokio/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,23 @@
mod runtime;
pub use runtime::RuntimeMetrics;

cfg_unstable_metrics! {
mod batch;
pub(crate) use batch::MetricsBatch;
mod worker;
pub(crate) use worker::WorkerMetrics;

mod batch;
pub(crate) use batch::MetricsBatch;

mod histogram;
pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder};

mod histogram;
pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder};
cfg_unstable_metrics! {
#[allow(unreachable_pub)] // rust-lang/rust#57411
pub use histogram::HistogramScale;


mod scheduler;
pub(crate) use scheduler::SchedulerMetrics;

mod worker;
pub(crate) use worker::WorkerMetrics;

cfg_net! {
mod io;
pub(crate) use io::IoDriverMetrics;
Expand All @@ -36,5 +37,5 @@ cfg_unstable_metrics! {
cfg_not_unstable_metrics! {
mod mock;

pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder};
pub(crate) use mock::{SchedulerMetrics};
}
97 changes: 47 additions & 50 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
use crate::runtime::Handle;

cfg_unstable_metrics! {
use std::ops::Range;
use std::thread::ThreadId;
cfg_64bit_metrics! {
use std::sync::atomic::Ordering::Relaxed;
}
use std::time::Duration;
}

/// Handle to the runtime's metrics.
Expand Down Expand Up @@ -96,6 +94,51 @@ impl RuntimeMetrics {
self.handle.inner.injection_queue_depth()
}

/// Returns the amount of time the given worker thread has been busy.
///
/// The worker busy duration starts at zero when the runtime is created and
/// increases whenever the worker is spending time processing work. Using
/// this value can indicate the load of the given worker. If a lot of time
/// is spent busy, then the worker is under load and will check for inbound
/// events less often.
///
/// The timer is monotonically increasing. It is never decremented or reset
/// to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_total_busy_duration(0);
/// println!("worker 0 was busy for a total of {:?}", n);
/// }
/// ```
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.busy_duration_total
.load(Relaxed);
Duration::from_nanos(nanos)
}
cfg_unstable_metrics! {

/// Returns the number of additional threads spawned by the runtime.
Expand Down Expand Up @@ -543,52 +586,6 @@ impl RuntimeMetrics {
.load(Relaxed)
}

/// Returns the amount of time the given worker thread has been busy.
///
/// The worker busy duration starts at zero when the runtime is created and
/// increases whenever the worker is spending time processing work. Using
/// this value can indicate the load of the given worker. If a lot of time
/// is spent busy, then the worker is under load and will check for inbound
/// events less often.
///
/// The timer is monotonically increasing. It is never decremented or reset
/// to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_total_busy_duration(0);
/// println!("worker 0 was busy for a total of {:?}", n);
/// }
/// ```
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.busy_duration_total
.load(Relaxed);
Duration::from_nanos(nanos)
}

/// Returns the number of tasks scheduled from **within** the runtime on the
/// given worker's local queue.
///
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl WorkerMetrics {
WorkerMetrics::default()
}

#[allow(dead_code)]
pub(crate) fn queue_depth(&self) -> usize {
self.queue_depth.load(Relaxed)
}
Expand All @@ -78,6 +79,7 @@ impl WorkerMetrics {
self.queue_depth.store(len, Relaxed);
}

#[allow(dead_code)]
pub(crate) fn thread_id(&self) -> Option<ThreadId> {
*self.thread_id.lock().unwrap()
}
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,11 @@ impl Handle {
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.inject.len()
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}
}

cfg_unstable_metrics! {
Expand All @@ -540,11 +545,6 @@ cfg_unstable_metrics! {
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.worker_metrics(worker).queue_depth()
}
Expand Down
14 changes: 7 additions & 7 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ cfg_rt! {
use crate::runtime::TaskHooks;
}

use crate::runtime::{driver, WorkerMetrics};

cfg_rt_multi_thread! {
mod block_in_place;
pub(crate) use block_in_place::block_in_place;
Expand All @@ -27,8 +29,6 @@ cfg_rt_multi_thread! {
}
}

use crate::runtime::driver;

#[derive(Debug, Clone)]
pub(crate) enum Handle {
#[cfg(feature = "rt")]
Expand Down Expand Up @@ -193,10 +193,14 @@ cfg_rt! {
pub(crate) fn injection_queue_depth(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
}
}

cfg_unstable_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
use crate::runtime::{SchedulerMetrics};

impl Handle {
cfg_64bit_metrics! {
Expand All @@ -217,10 +221,6 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
}
Expand Down
11 changes: 6 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::runtime::WorkerMetrics;
use super::Handle;

cfg_unstable_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
use crate::runtime::{SchedulerMetrics};
}

impl Handle {
Expand All @@ -17,6 +18,10 @@ impl Handle {
self.shared.injection_queue_depth()
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.shared.worker_metrics[worker]
}

cfg_unstable_metrics! {
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
Expand All @@ -39,10 +44,6 @@ impl Handle {
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.shared.worker_metrics[worker]
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}
Expand Down
Loading

0 comments on commit 8f1fcb4

Please sign in to comment.