From f280df94100d24e868ce3f9fbfec160677d8a124 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 19 Apr 2023 19:07:55 +0200 Subject: [PATCH] feat: add scheduled time per task (#406) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each task displays the sum of the time it has been idle and busy, as well as the total. The idle time includes the time between when a task is woken, and when the runtime actually polls that task. There are cases where a task may be scheduled for a long time after being woken, before it is polled. Especially if many tasks are woken at the same time and don't yield back to the runtime quickly. To add visibility to this, the total time that a task is scheduled before being polled has been added. Additionally, a new task state `Scheduled` has been added. This is displayed in both the tasks table and in the task detail view. In the `console-api`, the total `scheduled_time` for the task has been added to the `TaskStats` message in `tasks.proto`. This is the first of two parts. In the second part (#409), a histogram for scheduled time will be added, the equivalent of the poll time histogram which is already available on the task detail screen. To show a pathological case which may lead to needing to see the scheduled time, a new example has been added to the `console-subscriber` ## PR Notes This PR does something adjacent to what is described in #50, but not quite. The unicode character used for a `SCHED` task is ⏫. The second PR (#409) will record a scheduled time histogram the same as it recorded for poll times. These two changes should go in together (and certainly shouldn't be released separately). However, this PR is already quite big, so they'll be separated out. The idea is that this PR isn't merged until the next one has also been reviewed and approved. It would be good to get some feedback at this stage though. The task list view with the new column for `Sched` time: a tasks table view for the long-scheduled example The `Task` block in the task detail view showing the new `Scheduled` time entry. The task block on the task detail view for the rx task in the long-scheduled example --- console-api/proto/common.proto | 9 +- console-api/proto/tasks.proto | 10 +++ .../src/generated/rs.tokio.console.common.rs | 9 +- .../src/generated/rs.tokio.console.tasks.rs | 11 +++ console-subscriber/examples/long_scheduled.rs | 78 ++++++++++++++++ console-subscriber/src/stats.rs | 90 +++++++++++++------ tokio-console/src/state/tasks.rs | 47 ++++++++-- tokio-console/src/view/mod.rs | 4 +- tokio-console/src/view/task.rs | 7 +- tokio-console/src/view/tasks.rs | 15 ++-- 10 files changed, 230 insertions(+), 50 deletions(-) create mode 100644 console-subscriber/examples/long_scheduled.rs diff --git a/console-api/proto/common.proto b/console-api/proto/common.proto index 5e4a8ec86..f36785c38 100644 --- a/console-api/proto/common.proto +++ b/console-api/proto/common.proto @@ -177,10 +177,13 @@ message PollStats { // its poll method has completed. optional google.protobuf.Timestamp last_poll_ended = 5; // The total duration this object was being *actively polled*, summed across - // all polls. Note that this includes only polls that have completed and is - // not reflecting any inprogress polls. Subtracting `busy_time` from the + // all polls. + // + // Note that this includes only polls that have completed, and does not + // reflect any in-progress polls. Subtracting `busy_time` from the // total lifetime of the polled object results in the amount of time it - // has spent *waiting* to be polled. + // has spent *waiting* to be polled (including the `scheduled_time` value + // from `TaskStats`, if this is a task). google.protobuf.Duration busy_time = 6; } diff --git a/console-api/proto/tasks.proto b/console-api/proto/tasks.proto index 8c00c3d1c..6d7c58d07 100644 --- a/console-api/proto/tasks.proto +++ b/console-api/proto/tasks.proto @@ -130,6 +130,16 @@ message Stats { common.PollStats poll_stats = 7; // The total number of times this task has woken itself. uint64 self_wakes = 8; + // The total duration this task was scheduled prior to being polled, summed + // across all poll cycles. + // + // Note that this includes only polls that have started, and does not + // reflect any scheduled state where the task hasn't yet been polled. + // Subtracting both `busy_time` (from the task's `PollStats`) and + // `scheduled_time` from the total lifetime of the task results in the + // amount of time it spent unable to progress because it was waiting on + // some resource. + google.protobuf.Duration scheduled_time = 9; } diff --git a/console-api/src/generated/rs.tokio.console.common.rs b/console-api/src/generated/rs.tokio.console.common.rs index d651148e3..315d7825a 100644 --- a/console-api/src/generated/rs.tokio.console.common.rs +++ b/console-api/src/generated/rs.tokio.console.common.rs @@ -253,10 +253,13 @@ pub struct PollStats { #[prost(message, optional, tag="5")] pub last_poll_ended: ::core::option::Option<::prost_types::Timestamp>, /// The total duration this object was being *actively polled*, summed across - /// all polls. Note that this includes only polls that have completed and is - /// not reflecting any inprogress polls. Subtracting `busy_time` from the + /// all polls. + /// + /// Note that this includes only polls that have completed, and does not + /// reflect any in-progress polls. Subtracting `busy_time` from the /// total lifetime of the polled object results in the amount of time it - /// has spent *waiting* to be polled. + /// has spent *waiting* to be polled (including the `scheduled_time` value + /// from `TaskStats`, if this is a task). #[prost(message, optional, tag="6")] pub busy_time: ::core::option::Option<::prost_types::Duration>, } diff --git a/console-api/src/generated/rs.tokio.console.tasks.rs b/console-api/src/generated/rs.tokio.console.tasks.rs index 6ff543bc9..8bdf62c3f 100644 --- a/console-api/src/generated/rs.tokio.console.tasks.rs +++ b/console-api/src/generated/rs.tokio.console.tasks.rs @@ -167,6 +167,17 @@ pub struct Stats { /// The total number of times this task has woken itself. #[prost(uint64, tag="8")] pub self_wakes: u64, + /// The total duration this task was scheduled prior to being polled, summed + /// across all poll cycles. + /// + /// Note that this includes only polls that have started, and does not + /// reflect any scheduled state where the task hasn't yet been polled. + /// Subtracting both `busy_time` (from the task's `PollStats`) and + /// `scheduled_time` from the total lifetime of the task results in the + /// amount of time it spent unable to progress because it was waiting on + /// some resource. + #[prost(message, optional, tag="9")] + pub scheduled_time: ::core::option::Option<::prost_types::Duration>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct DurationHistogram { diff --git a/console-subscriber/examples/long_scheduled.rs b/console-subscriber/examples/long_scheduled.rs new file mode 100644 index 000000000..d8cf79144 --- /dev/null +++ b/console-subscriber/examples/long_scheduled.rs @@ -0,0 +1,78 @@ +//! Long scheduled time +//! +//! This example shows an application with a task that has an excessive +//! time between being woken and being polled. +//! +//! It consists of a channel where a sender task sends a message +//! through the channel and then immediately does a lot of work +//! (simulated in this case by a call to `std::thread::sleep`). +//! +//! As soon as the sender task calls `send()` the receiver task gets +//! woken, but because there's only a single worker thread, it doesn't +//! get polled until after the sender task has finished "working" and +//! yields (via `tokio::time::sleep`). +//! +//! In the console, this is visible in the `rx` task, which has very +//! high scheduled times - in the detail screen you will see that around +//! it is scheduled around 98% of the time. The `tx` task, on the other +//! hand, is busy most of the time. +use std::time::Duration; + +use console_subscriber::ConsoleLayer; +use tokio::{sync::mpsc, task}; +use tracing::info; + +#[tokio::main(flavor = "multi_thread", worker_threads = 1)] +async fn main() -> Result<(), Box> { + ConsoleLayer::builder() + .with_default_env() + .publish_interval(Duration::from_millis(100)) + .init(); + + let (tx, rx) = mpsc::channel::(1); + let count = 10000; + + let jh_rx = task::Builder::new() + .name("rx") + .spawn(receiver(rx, count)) + .unwrap(); + let jh_tx = task::Builder::new() + .name("tx") + .spawn(sender(tx, count)) + .unwrap(); + + let res_tx = jh_tx.await; + let res_rx = jh_rx.await; + info!( + "main: Joined sender: {:?} and receiver: {:?}", + res_tx, res_rx, + ); + + tokio::time::sleep(Duration::from_millis(200)).await; + + Ok(()) +} + +async fn sender(tx: mpsc::Sender, count: u32) { + info!("tx: started"); + + for idx in 0..count { + let msg: u32 = idx; + let res = tx.send(msg).await; + info!("tx: sent msg '{}' result: {:?}", msg, res); + + std::thread::sleep(Duration::from_millis(5000)); + info!("tx: work done"); + + tokio::time::sleep(Duration::from_millis(100)).await; + } +} + +async fn receiver(mut rx: mpsc::Receiver, count: u32) { + info!("rx: started"); + + for _ in 0..count { + let msg = rx.recv().await; + info!("rx: Received message: '{:?}'", msg); + } +} diff --git a/console-subscriber/src/stats.rs b/console-subscriber/src/stats.rs index 2bce3c085..0e6995260 100644 --- a/console-subscriber/src/stats.rs +++ b/console-subscriber/src/stats.rs @@ -56,7 +56,7 @@ pub(crate) struct TaskStats { is_dropped: AtomicBool, // task stats pub(crate) created_at: Instant, - timestamps: Mutex, + dropped_at: Mutex>, // waker stats wakes: AtomicUsize, @@ -100,12 +100,6 @@ pub(crate) struct ResourceStats { pub(crate) parent_id: Option, } -#[derive(Debug, Default)] -struct TaskTimestamps { - dropped_at: Option, - last_wake: Option, -} - #[derive(Debug, Default)] struct PollStats { /// The number of polls in progress @@ -118,9 +112,11 @@ struct PollStats { #[derive(Debug, Default)] struct PollTimestamps { first_poll: Option, + last_wake: Option, last_poll_started: Option, last_poll_ended: Option, busy_time: Duration, + scheduled_time: Duration, histogram: H, } @@ -162,14 +158,16 @@ impl TaskStats { is_dirty: AtomicBool::new(true), is_dropped: AtomicBool::new(false), created_at, - timestamps: Mutex::new(TaskTimestamps::default()), + dropped_at: Mutex::new(None), poll_stats: PollStats { timestamps: Mutex::new(PollTimestamps { histogram: Histogram::new(poll_duration_max), first_poll: None, + last_wake: None, last_poll_started: None, last_poll_ended: None, busy_time: Duration::new(0, 0), + scheduled_time: Duration::new(0, 0), }), current_polls: AtomicUsize::new(0), polls: AtomicUsize::new(0), @@ -209,13 +207,14 @@ impl TaskStats { } fn wake(&self, at: Instant, self_wake: bool) { - let mut timestamps = self.timestamps.lock(); - timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at)); - self.wakes.fetch_add(1, Release); + self.poll_stats.wake(at); + self.wakes.fetch_add(1, Release); if self_wake { self.wakes.fetch_add(1, Release); } + + self.make_dirty(); } pub(crate) fn start_poll(&self, at: Instant) { @@ -235,8 +234,7 @@ impl TaskStats { return; } - let mut timestamps = self.timestamps.lock(); - let _prev = timestamps.dropped_at.replace(dropped_at); + let _prev = self.dropped_at.lock().replace(dropped_at); debug_assert_eq!(_prev, None, "tried to drop a task twice; this is a bug!"); self.make_dirty(); } @@ -257,16 +255,28 @@ impl ToProto for TaskStats { fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output { let poll_stats = Some(self.poll_stats.to_proto(base_time)); - let timestamps = self.timestamps.lock(); + let timestamps = self.poll_stats.timestamps.lock(); proto::tasks::Stats { poll_stats, created_at: Some(base_time.to_timestamp(self.created_at)), - dropped_at: timestamps.dropped_at.map(|at| base_time.to_timestamp(at)), + dropped_at: self.dropped_at.lock().map(|at| base_time.to_timestamp(at)), wakes: self.wakes.load(Acquire) as u64, waker_clones: self.waker_clones.load(Acquire) as u64, self_wakes: self.self_wakes.load(Acquire) as u64, waker_drops: self.waker_drops.load(Acquire) as u64, last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)), + scheduled_time: Some( + timestamps + .scheduled_time + .try_into() + .unwrap_or_else(|error| { + eprintln!( + "failed to convert `scheduled_time` to protobuf duration: {}", + error + ); + Default::default() + }), + ), } } } @@ -287,7 +297,7 @@ impl DroppedAt for TaskStats { // avoid acquiring the lock if we know we haven't tried to drop this // thing yet if self.is_dropped.load(Acquire) { - return self.timestamps.lock().dropped_at; + return *self.dropped_at.lock(); } None @@ -466,18 +476,46 @@ impl ToProto for ResourceStats { // === impl PollStats === impl PollStats { - fn start_poll(&self, at: Instant) { - if self.current_polls.fetch_add(1, AcqRel) == 0 { - // We are starting the first poll - let mut timestamps = self.timestamps.lock(); - if timestamps.first_poll.is_none() { - timestamps.first_poll = Some(at); - } + fn wake(&self, at: Instant) { + let mut timestamps = self.timestamps.lock(); + timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at)); + } - timestamps.last_poll_started = Some(at); + fn start_poll(&self, at: Instant) { + if self.current_polls.fetch_add(1, AcqRel) > 0 { + return; + } - self.polls.fetch_add(1, Release); + // We are starting the first poll + let mut timestamps = self.timestamps.lock(); + if timestamps.first_poll.is_none() { + timestamps.first_poll = Some(at); } + + timestamps.last_poll_started = Some(at); + + self.polls.fetch_add(1, Release); + + // If the last poll ended after the last wake then it was likely + // a self-wake, so we measure from the end of the last poll instead. + // This also ensures that `busy_time` and `scheduled_time` don't overlap. + let scheduled = match std::cmp::max(timestamps.last_wake, timestamps.last_poll_ended) { + Some(scheduled) => scheduled, + None => return, // Async operations record polls, but not wakes + }; + + let elapsed = match at.checked_duration_since(scheduled) { + Some(elapsed) => elapsed, + None => { + eprintln!( + "possible Instant clock skew detected: a poll's start timestamp \ + was before the wake time/last poll end timestamp\nwake = {:?}\n start = {:?}", + scheduled, at + ); + return; + } + }; + timestamps.scheduled_time += elapsed; } fn end_poll(&self, at: Instant) { @@ -534,7 +572,7 @@ impl ToProto for PollStats { .map(|at| base_time.to_timestamp(at)), busy_time: Some(timestamps.busy_time.try_into().unwrap_or_else(|error| { eprintln!( - "failed to convert busy time to protobuf duration: {}", + "failed to convert `busy_time` to protobuf duration: {}", error ); Default::default() diff --git a/tokio-console/src/state/tasks.rs b/tokio-console/src/state/tasks.rs index ad362f518..1dce737e5 100644 --- a/tokio-console/src/state/tasks.rs +++ b/tokio-console/src/state/tasks.rs @@ -43,10 +43,11 @@ pub(crate) enum SortBy { Name = 3, Total = 4, Busy = 5, - Idle = 6, - Polls = 7, - Target = 8, - Location = 9, + Scheduled = 6, + Idle = 7, + Polls = 8, + Target = 9, + Location = 10, } #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] @@ -54,6 +55,7 @@ pub(crate) enum TaskState { Completed, Idle, Running, + Scheduled, } pub(crate) type TaskRef = store::Ref; @@ -100,6 +102,7 @@ struct TaskStats { created_at: SystemTime, dropped_at: Option, busy: Duration, + scheduled: Duration, last_poll_started: Option, last_poll_ended: Option, idle: Option, @@ -297,6 +300,10 @@ impl Task { self.stats.last_poll_started > self.stats.last_poll_ended } + pub(crate) fn is_scheduled(&self) -> bool { + self.stats.last_wake > self.stats.last_poll_started + } + pub(crate) fn is_completed(&self) -> bool { self.stats.total.is_some() } @@ -310,6 +317,10 @@ impl Task { return TaskState::Running; } + if self.is_scheduled() { + return TaskState::Scheduled; + } + TaskState::Idle } @@ -331,10 +342,24 @@ impl Task { self.stats.busy } + pub(crate) fn scheduled(&self, since: SystemTime) -> Duration { + if let Some(wake) = self.stats.last_wake { + if self.stats.last_wake > self.stats.last_poll_started { + // In this case the task is scheduled, but has not yet been polled + let current_time_since_wake = since.duration_since(wake).unwrap_or_default(); + return self.stats.scheduled + current_time_since_wake; + } + } + self.stats.scheduled + } + pub(crate) fn idle(&self, since: SystemTime) -> Duration { self.stats .idle - .or_else(|| self.total(since).checked_sub(self.busy(since))) + .or_else(|| { + self.total(since) + .checked_sub(self.busy(since) + self.scheduled(since)) + }) .unwrap_or_default() } @@ -429,11 +454,14 @@ impl From for TaskStats { let poll_stats = pb.poll_stats.expect("task should have poll stats"); let busy = poll_stats.busy_time.map(pb_duration).unwrap_or_default(); - let idle = total.map(|total| total.checked_sub(busy).unwrap_or_default()); + let scheduled = pb.scheduled_time.map(pb_duration).unwrap_or_default(); + let idle = total.map(|total| total.checked_sub(busy + scheduled).unwrap_or_default()); Self { total, idle, + scheduled, busy, + last_wake: pb.last_wake.map(|v| v.try_into().unwrap()), last_poll_started: poll_stats.last_poll_started.map(|v| v.try_into().unwrap()), last_poll_ended: poll_stats.last_poll_ended.map(|v| v.try_into().unwrap()), polls: poll_stats.polls, @@ -442,7 +470,6 @@ impl From for TaskStats { wakes: pb.wakes, waker_clones: pb.waker_clones, waker_drops: pb.waker_drops, - last_wake: pb.last_wake.map(|v| v.try_into().unwrap()), self_wakes: pb.self_wakes, } } @@ -474,6 +501,9 @@ impl SortBy { Self::Idle => { tasks.sort_unstable_by_key(|task| task.upgrade().map(|t| t.borrow().idle(now))) } + Self::Scheduled => { + tasks.sort_unstable_by_key(|task| task.upgrade().map(|t| t.borrow().scheduled(now))) + } Self::Busy => { tasks.sort_unstable_by_key(|task| task.upgrade().map(|t| t.borrow().busy(now))) } @@ -505,6 +535,7 @@ impl TryFrom for SortBy { idx if idx == Self::Name as usize => Ok(Self::Name), idx if idx == Self::Total as usize => Ok(Self::Total), idx if idx == Self::Busy as usize => Ok(Self::Busy), + idx if idx == Self::Scheduled as usize => Ok(Self::Scheduled), idx if idx == Self::Idle as usize => Ok(Self::Idle), idx if idx == Self::Polls as usize => Ok(Self::Polls), idx if idx == Self::Target as usize => Ok(Self::Target), @@ -517,6 +548,7 @@ impl TryFrom for SortBy { impl TaskState { pub(crate) fn render(self, styles: &crate::view::Styles) -> Span<'static> { const RUNNING_UTF8: &str = "\u{25B6}"; + const SCHEDULED_UTF8: &str = "\u{23EB}"; const IDLE_UTF8: &str = "\u{23F8}"; const COMPLETED_UTF8: &str = "\u{23F9}"; match self { @@ -524,6 +556,7 @@ impl TaskState { styles.if_utf8(RUNNING_UTF8, "BUSY"), styles.fg(Color::Green), ), + Self::Scheduled => Span::raw(styles.if_utf8(SCHEDULED_UTF8, "SCHED")), Self::Idle => Span::raw(styles.if_utf8(IDLE_UTF8, "IDLE")), Self::Completed => Span::raw(styles.if_utf8(COMPLETED_UTF8, "DONE")), } diff --git a/tokio-console/src/view/mod.rs b/tokio-console/src/view/mod.rs index 3d35350b8..160419adb 100644 --- a/tokio-console/src/view/mod.rs +++ b/tokio-console/src/view/mod.rs @@ -39,7 +39,7 @@ pub struct View { /// details view), we want to leave the task list's state the way we left it /// --- e.g., if the user previously selected a particular sorting, we want /// it to remain sorted that way when we return to it. - tasks_list: TableListState, + tasks_list: TableListState, resources_list: TableListState, state: ViewState, pub(crate) styles: Styles, @@ -93,7 +93,7 @@ impl View { pub fn new(styles: Styles) -> Self { Self { state: ViewState::TasksList, - tasks_list: TableListState::::default(), + tasks_list: TableListState::::default(), resources_list: TableListState::::default(), styles, } diff --git a/tokio-console/src/view/task.rs b/tokio-console/src/view/task.rs index 93edd13b7..c9e021a10 100644 --- a/tokio-console/src/view/task.rs +++ b/tokio-console/src/view/task.rs @@ -69,7 +69,7 @@ impl TaskView { // controls layout::Constraint::Length(1), // task stats - layout::Constraint::Length(8), + layout::Constraint::Length(10), // poll duration layout::Constraint::Length(9), // fields @@ -89,7 +89,7 @@ impl TaskView { // warnings (add 2 for top and bottom borders) layout::Constraint::Length(warnings.len() as u16 + 2), // task stats - layout::Constraint::Length(8), + layout::Constraint::Length(10), // poll duration layout::Constraint::Length(9), // fields @@ -122,7 +122,7 @@ impl TaskView { ]); // Just preallocate capacity for ID, name, target, total, busy, and idle. - let mut overview = Vec::with_capacity(7); + let mut overview = Vec::with_capacity(8); overview.push(Spans::from(vec![ bold("ID: "), Span::raw(format!("{} ", task.id())), @@ -159,6 +159,7 @@ impl TaskView { styles.time_units(total, view::DUR_LIST_PRECISION, None), ])); overview.push(dur_percent("Busy: ", task.busy(now))); + overview.push(dur_percent("Scheduled: ", task.scheduled(now))); overview.push(dur_percent("Idle: ", task.idle(now))); let mut waker_stats = vec![Spans::from(vec![ diff --git a/tokio-console/src/view/tasks.rs b/tokio-console/src/view/tasks.rs index d103779ab..b537e08e1 100644 --- a/tokio-console/src/view/tasks.rs +++ b/tokio-console/src/view/tasks.rs @@ -19,17 +19,17 @@ use tui::{ #[derive(Debug, Default)] pub(crate) struct TasksTable {} -impl TableList<11> for TasksTable { +impl TableList<12> for TasksTable { type Row = Task; type Sort = SortBy; type Context = (); - const HEADER: &'static [&'static str; 11] = &[ - "Warn", "ID", "State", "Name", "Total", "Busy", "Idle", "Polls", "Target", "Location", - "Fields", + const HEADER: &'static [&'static str; 12] = &[ + "Warn", "ID", "State", "Name", "Total", "Busy", "Sched", "Idle", "Polls", "Target", + "Location", "Fields", ]; - const WIDTHS: &'static [usize; 11] = &[ + const WIDTHS: &'static [usize; 12] = &[ Self::HEADER[0].len() + 1, Self::HEADER[1].len() + 1, Self::HEADER[2].len() + 1, @@ -41,10 +41,11 @@ impl TableList<11> for TasksTable { Self::HEADER[8].len() + 1, Self::HEADER[9].len() + 1, Self::HEADER[10].len() + 1, + Self::HEADER[11].len() + 1, ]; fn render( - table_list_state: &mut TableListState, + table_list_state: &mut TableListState, styles: &view::Styles, frame: &mut tui::terminal::Frame, area: layout::Rect, @@ -129,6 +130,7 @@ impl TableList<11> for TasksTable { Cell::from(name_width.update_str(task.name().unwrap_or("")).to_string()), dur_cell(task.total(now)), dur_cell(task.busy(now)), + dur_cell(task.scheduled(now)), dur_cell(task.idle(now)), Cell::from(polls_width.update_str(task.total_polls().to_string())), Cell::from(target_width.update_str(task.target()).to_owned()), @@ -252,6 +254,7 @@ impl TableList<11> for TasksTable { layout::Constraint::Length(DUR_LEN as u16), layout::Constraint::Length(DUR_LEN as u16), layout::Constraint::Length(DUR_LEN as u16), + layout::Constraint::Length(DUR_LEN as u16), polls_width.constraint(), target_width.constraint(), location_width.constraint(),