Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stabilize worker_total_busy_duration #6899

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
27 changes: 15 additions & 12 deletions tokio/src/runtime/metrics/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,34 @@ pub(crate) struct HistogramBatch {
resolution: u64,
}

cfg_unstable! {
/// Whether the histogram used to aggregate a metric uses a linear or
/// logarithmic scale.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[non_exhaustive]
pub enum HistogramScale {
/// Linear bucket scale
Linear,

/// Logarithmic bucket scale
Log,
}
/// Whether the histogram used to aggregate a metric uses a linear or
/// logarithmic scale.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[non_exhaustive]
#[allow(unreachable_pub)]
pub enum HistogramScale {
/// Linear bucket scale
Linear,

/// Logarithmic bucket scale
#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change required for the mean time stabilization or just a coincidental change?

Copy link
Contributor Author

@Owen-CH-Leung Owen-CH-Leung Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this change is required.

At master we have implemented another struct WorkerMetrics at tokio/src/runtime/metrics/mock.rs which didn't use struct Histogram defined at tokio/src/runtime/metrics/histogram.rs.

In order to stabilize the API, I have to removed this mock WorkerMetrics and instead point to the "real" WorkerMetrics at tokio/src/runtime/metrics/worker.rs

The "real" WorkerMetrics has a field poll_count_histogram which is Option<Histogram>, and thus will attempt to parse tokio/src/runtime/metrics/histogram.rs. From there, you can see Histogram and HistogramBuilder both refers to HistogramScale which is hidden inside tokio_unstable. I think this wouldn't compile.

(I might be wrong though so feel free to correct me)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to read through this PR in more detail so bare with me, but we shouldn't be making this stable and public if it isn't actually needed to to stabilize worker_total_busy_duration (and I'm mostly sure that it isn't).

In general, all the #[allow(dead_code)] that are required for this chnage give me the impression that we're exposing something that is only really used in tokio_unstable and so we should find a way to expose it only unstable tokio_unstable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment!

I think the core issue here is that by exposing worker_total_busy_duration, we are also exposing the "real" WorkerMetrics at tokio/src/runtime/metrics/worker.rs, which in turn will attempt to parse HistogramScale.

Currently at master branch, when no tokio_unstable flag is passed in, the WorkerMetrics at tokio/src/runtime/metrics/mock.rs will be parsed (which is just an empty struct). And if the flag is passed in, the WorkerMetrics at tokio/src/runtime/metrics/worker.rs will be parsed

https://github.com/tokio-rs/tokio/blob/master/tokio/src/runtime/metrics/mod.rs#L27-L40

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not actually public, it's still behind config_unstable. (Just verified via the autogenerated docs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, now I believe I understand.

I would suggest that instead of compiling with the entire worker metrics in order to access only the busy_duration_total field, we should gate all the fields that we won't be using on cfg_unstable_metrics. Otherwise users will still be paying the price for metrics which they don't have access to - and that is something that we would really like to avoid.

Have a look at the runtime Builder for an example of how we have fields and implementations that touch them gated on a cfg flag:

#[cfg(tokio_unstable)]
pub(super) unhandled_panic: UnhandledPanic,

As a general rule, if you need #[allow(dead_code)] then there is probably something that should be gated on a cfg flag instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hds . I've reverted changes to histogram.rs. Also the real Histogram, HistogramBuilder and HistogramBatch are gated behind unstable flag now, and instead I've created a mocked version of these for compilation.

For WorkerMetrics, as we target to stabilize more metrics, I'd suggest exposing all fields instead of stabilizing only busy_duration_total and putting other fields behind unstable.

Let me know what you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Owen-CH-Leung the problem with stabilizing all of WorkerMetrics is that when tokio_unstable is not enabled, the runtime will pay the price for all those metrics, but there will be no way to access them. For this reason I think that it would be better to only stabilize what we're exposing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hds . I've made changes to hide most of the fields of WorkerMetrics behind unstable flag, except for busy_duration_total, queue_depth and thread_id. The latter 2 fields are needed in order for set_queue_depth and set_thread_id to work properly.

I've also enriched the mock MetricsBatch to have minimal implementation of batch::MetricsBatch so that the worker_total_busy_duration API can function properly under stable build

Let me know your thoughts!

Log,
}

impl Histogram {
#[allow(dead_code)]
pub(crate) fn num_buckets(&self) -> usize {
self.buckets.len()
}

cfg_64bit_metrics! {
#[allow(dead_code)]
pub(crate) fn get(&self, bucket: usize) -> u64 {
self.buckets[bucket].load(Relaxed)
}
}

#[allow(dead_code)]
pub(crate) fn bucket_range(&self, bucket: usize) -> Range<u64> {
match self.scale {
HistogramScale::Log => Range {
Expand Down
47 changes: 0 additions & 47 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
//! This file contains mocks of the types in src/runtime/metrics

use std::thread::ThreadId;

pub(crate) struct SchedulerMetrics {}

pub(crate) struct WorkerMetrics {}

pub(crate) struct MetricsBatch {}

#[derive(Clone, Default)]
pub(crate) struct HistogramBuilder {}

impl SchedulerMetrics {
pub(crate) fn new() -> Self {
Self {}
Expand All @@ -19,41 +10,3 @@ impl SchedulerMetrics {
/// Increment the number of tasks scheduled externally
pub(crate) fn inc_remote_schedule_count(&self) {}
}

impl WorkerMetrics {
pub(crate) fn new() -> Self {
Self {}
}

pub(crate) fn from_config(config: &crate::runtime::Config) -> Self {
// Prevent the dead-code warning from being triggered
let _ = &config.metrics_poll_count_histogram;
Self::new()
}

pub(crate) fn set_queue_depth(&self, _len: usize) {}
pub(crate) fn set_thread_id(&self, _thread_id: ThreadId) {}
}

impl MetricsBatch {
pub(crate) fn new(_: &WorkerMetrics) -> Self {
Self {}
}

pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn unparked(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {}
pub(crate) fn start_poll(&mut self) {}
pub(crate) fn end_poll(&mut self) {}
}

cfg_rt_multi_thread! {
impl MetricsBatch {
pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
pub(crate) fn incr_steal_operations(&mut self) {}
pub(crate) fn incr_overflow_count(&mut self) {}
}
}
19 changes: 10 additions & 9 deletions tokio/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,23 @@
mod runtime;
pub use runtime::RuntimeMetrics;

cfg_unstable_metrics! {
mod batch;
pub(crate) use batch::MetricsBatch;
mod worker;
pub(crate) use worker::WorkerMetrics;

mod batch;
pub(crate) use batch::MetricsBatch;

mod histogram;
pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder};

mod histogram;
pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder};
cfg_unstable_metrics! {
#[allow(unreachable_pub)] // rust-lang/rust#57411
pub use histogram::HistogramScale;


mod scheduler;
pub(crate) use scheduler::SchedulerMetrics;

mod worker;
pub(crate) use worker::WorkerMetrics;

cfg_net! {
mod io;
pub(crate) use io::IoDriverMetrics;
Expand All @@ -36,5 +37,5 @@ cfg_unstable_metrics! {
cfg_not_unstable_metrics! {
mod mock;

pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder};
pub(crate) use mock::SchedulerMetrics;
}
104 changes: 54 additions & 50 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::runtime::Handle;
#[allow(unused_imports)]
use std::time::Duration;

cfg_64bit_metrics! {
use std::sync::atomic::Ordering::Relaxed;
}

cfg_unstable_metrics! {
use std::ops::Range;
use std::thread::ThreadId;
cfg_64bit_metrics! {
use std::sync::atomic::Ordering::Relaxed;
}
use std::time::Duration;
}

/// Handle to the runtime's metrics.
Expand Down Expand Up @@ -96,6 +98,54 @@ impl RuntimeMetrics {
self.handle.inner.injection_queue_depth()
}

cfg_64bit_metrics! {
/// Returns the amount of time the given worker thread has been busy.
///
/// The worker busy duration starts at zero when the runtime is created and
/// increases whenever the worker is spending time processing work. Using
/// this value can indicate the load of the given worker. If a lot of time
/// is spent busy, then the worker is under load and will check for inbound
/// events less often.
///
/// The timer is monotonically increasing. It is never decremented or reset
/// to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_total_busy_duration(0);
/// println!("worker 0 was busy for a total of {:?}", n);
/// }
/// ```
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.busy_duration_total
.load(Relaxed);
Duration::from_nanos(nanos)
}
}

cfg_unstable_metrics! {

/// Returns the number of additional threads spawned by the runtime.
Expand Down Expand Up @@ -543,52 +593,6 @@ impl RuntimeMetrics {
.load(Relaxed)
}

/// Returns the amount of time the given worker thread has been busy.
///
/// The worker busy duration starts at zero when the runtime is created and
/// increases whenever the worker is spending time processing work. Using
/// this value can indicate the load of the given worker. If a lot of time
/// is spent busy, then the worker is under load and will check for inbound
/// events less often.
///
/// The timer is monotonically increasing. It is never decremented or reset
/// to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_total_busy_duration(0);
/// println!("worker 0 was busy for a total of {:?}", n);
/// }
/// ```
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.busy_duration_total
.load(Relaxed);
Duration::from_nanos(nanos)
}

/// Returns the number of tasks scheduled from **within** the runtime on the
/// given worker's local queue.
///
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl WorkerMetrics {
WorkerMetrics::default()
}

#[allow(dead_code)]
pub(crate) fn queue_depth(&self) -> usize {
self.queue_depth.load(Relaxed)
}
Expand All @@ -78,6 +79,7 @@ impl WorkerMetrics {
self.queue_depth.store(len, Relaxed);
}

#[allow(dead_code)]
pub(crate) fn thread_id(&self) -> Option<ThreadId> {
*self.thread_id.lock().unwrap()
}
Expand Down
11 changes: 6 additions & 5 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,12 @@ impl Handle {
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.inject.len()
}

#[allow(dead_code)]
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}
}

cfg_unstable_metrics! {
Expand All @@ -540,11 +546,6 @@ cfg_unstable_metrics! {
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.worker_metrics(worker).queue_depth()
}
Expand Down
13 changes: 8 additions & 5 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ cfg_rt! {
pub(crate) use inject::Inject;

use crate::runtime::TaskHooks;

use crate::runtime::WorkerMetrics;
}

cfg_rt_multi_thread! {
Expand Down Expand Up @@ -193,10 +195,15 @@ cfg_rt! {
pub(crate) fn injection_queue_depth(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
}

#[allow(dead_code)]
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
}
}

cfg_unstable_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
use crate::runtime::SchedulerMetrics;

impl Handle {
cfg_64bit_metrics! {
Expand All @@ -217,10 +224,6 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
}
Expand Down
12 changes: 7 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::Handle;
use crate::runtime::WorkerMetrics;

cfg_unstable_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
use crate::runtime::SchedulerMetrics;
}

impl Handle {
Expand All @@ -17,6 +18,11 @@ impl Handle {
self.shared.injection_queue_depth()
}

#[allow(dead_code)]
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.shared.worker_metrics[worker]
}

cfg_unstable_metrics! {
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
Expand All @@ -39,10 +45,6 @@ impl Handle {
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.shared.worker_metrics[worker]
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}
Expand Down
Loading
Loading