From 9eb3f5b556c1484f17368b193001ce972219d980 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 23 May 2023 14:38:15 -0700 Subject: [PATCH] rt(threaded): cap LIFO slot polls (#5712) As an optimization to improve locality, the multi-threaded scheduler maintains a single slot (LIFO slot). When a task is scheduled, it goes into the LIFO slot. The scheduler will run tasks in the LIFO slot first before checking the local queue. Ping-ping style workloads where task A notifies task B, which notifies task A again, can cause starvation as these two tasks repeatedly schedule the other in the LIFO slot. #5686, a first attempt at solving this problem, consumes a unit of budget each time a task is scheduled from the LIFO slot. However, at the time of this commit, the scheduler allocates 128 units of budget for each chunk of work. This is relatively high in situations where tasks do not perform many async operations yet have meaningful poll times (even 5-10 microsecond poll times can have an outsized impact on the scheduler). In an ideal world, the scheduler would adapt to the workload it is executing. However, as a stopgap, this commit limits the times the LIFO slot is prioritized per scheduler tick. --- .github/workflows/ci.yml | 26 ++++++ .github/workflows/loom.yml | 2 +- tokio/src/runtime/coop.rs | 11 --- .../scheduler/multi_thread/counters.rs | 16 ++++ .../runtime/scheduler/multi_thread/worker.rs | 90 +++++++++++++++---- tokio/src/runtime/tests/mod.rs | 4 + 6 files changed, 119 insertions(+), 30 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 08ae88a0dfe..9701134c6ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,6 +60,7 @@ jobs: - wasm32-wasi - check-external-types - check-fuzzing + - check-unstable-mt-counters steps: - run: exit 0 @@ -233,6 +234,31 @@ jobs: # the unstable cfg to RustDoc RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_taskdump + check-unstable-mt-counters: + name: check tokio full --internal-mt-counters + needs: basics + runs-on: ${{ matrix.os }} + strategy: + matrix: + include: + - os: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install Rust ${{ env.rust_stable }} + uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ env.rust_stable }} + - uses: Swatinem/rust-cache@v2 + # Run `tokio` with "unstable" and "taskdump" cfg flags. + - name: check tokio full --cfg unstable --cfg internal-mt-counters + run: cargo test --all-features + working-directory: tokio + env: + RUSTFLAGS: --cfg tokio_unstable --cfg tokio_internal_mt_counters -Dwarnings + # in order to run doctests for unstable features, we must also pass + # the unstable cfg to RustDoc + RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_internal_mt_counters + miri: name: miri needs: basics diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index 3fa179dd73c..a51829927de 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -48,7 +48,7 @@ jobs: run: cargo test --lib --release --features full -- --nocapture $SCOPE working-directory: tokio env: - RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings + RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings -C debug-assertions LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }} LOOM_MAX_BRANCHES: 10000 SCOPE: ${{ matrix.scope }} diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index b1177c2a4b7..f3ed17cff1d 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -119,17 +119,6 @@ cfg_rt_multi_thread! { pub(crate) fn set(budget: Budget) { let _ = context::budget(|cell| cell.set(budget)); } - - /// Consume one unit of progress from the current task's budget. - pub(crate) fn consume_one() { - let _ = context::budget(|cell| { - let mut budget = cell.get(); - if let Some(ref mut counter) = budget.0 { - *counter = counter.saturating_sub(1); - } - cell.set(budget); - }); - } } cfg_rt! { diff --git a/tokio/src/runtime/scheduler/multi_thread/counters.rs b/tokio/src/runtime/scheduler/multi_thread/counters.rs index bfc7d6da210..50bcc11985f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/counters.rs +++ b/tokio/src/runtime/scheduler/multi_thread/counters.rs @@ -6,17 +6,23 @@ mod imp { static NUM_MAINTENANCE: AtomicUsize = AtomicUsize::new(0); static NUM_NOTIFY_LOCAL: AtomicUsize = AtomicUsize::new(0); static NUM_UNPARKS_LOCAL: AtomicUsize = AtomicUsize::new(0); + static NUM_LIFO_SCHEDULES: AtomicUsize = AtomicUsize::new(0); + static NUM_LIFO_CAPPED: AtomicUsize = AtomicUsize::new(0); impl Drop for super::Counters { fn drop(&mut self) { let notifies_local = NUM_NOTIFY_LOCAL.load(Relaxed); let unparks_local = NUM_UNPARKS_LOCAL.load(Relaxed); let maintenance = NUM_MAINTENANCE.load(Relaxed); + let lifo_scheds = NUM_LIFO_SCHEDULES.load(Relaxed); + let lifo_capped = NUM_LIFO_CAPPED.load(Relaxed); println!("---"); println!("notifies (local): {}", notifies_local); println!(" unparks (local): {}", unparks_local); println!(" maintenance: {}", maintenance); + println!(" LIFO schedules: {}", lifo_scheds); + println!(" LIFO capped: {}", lifo_capped); } } @@ -31,6 +37,14 @@ mod imp { pub(crate) fn inc_num_maintenance() { NUM_MAINTENANCE.fetch_add(1, Relaxed); } + + pub(crate) fn inc_lifo_schedules() { + NUM_LIFO_SCHEDULES.fetch_add(1, Relaxed); + } + + pub(crate) fn inc_lifo_capped() { + NUM_LIFO_CAPPED.fetch_add(1, Relaxed); + } } #[cfg(not(tokio_internal_mt_counters))] @@ -38,6 +52,8 @@ mod imp { pub(crate) fn inc_num_inc_notify_local() {} pub(crate) fn inc_num_unparks_local() {} pub(crate) fn inc_num_maintenance() {} + pub(crate) fn inc_lifo_schedules() {} + pub(crate) fn inc_lifo_capped() {} } #[derive(Debug)] diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 4729376fc8f..e2bbb643db7 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -90,10 +90,14 @@ struct Core { /// When a task is scheduled from a worker, it is stored in this slot. The /// worker will check this slot for a task **before** checking the run /// queue. This effectively results in the **last** scheduled task to be run - /// next (LIFO). This is an optimization for message passing patterns and - /// helps to reduce latency. + /// next (LIFO). This is an optimization for improving locality which + /// benefits message passing patterns and helps to reduce latency. lifo_slot: Option, + /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`, + /// they go to the back of the `run_queue`. + lifo_enabled: bool, + /// The worker-local run queue. run_queue: queue::Local>, @@ -191,6 +195,12 @@ type Notified = task::Notified>; // Tracks thread-local state scoped_thread_local!(static CURRENT: Context); +/// Value picked out of thin-air. Running the LIFO slot a handful of times +/// seemms sufficient to benefit from locality. More than 3 times probably is +/// overweighing. The value can be tuned in the future with data that shows +/// improvements. +const MAX_LIFO_POLLS_PER_TICK: usize = 3; + pub(super) fn create( size: usize, park: Parker, @@ -214,6 +224,7 @@ pub(super) fn create( cores.push(Box::new(Core { tick: 0, lifo_slot: None, + lifo_enabled: !config.disable_lifo_slot, run_queue, is_searching: false, is_shutdown: false, @@ -422,7 +433,13 @@ fn run(worker: Arc) { impl Context { fn run(&self, mut core: Box) -> RunResult { + // Reset `lifo_enabled` here in case the core was previously stolen from + // a task that had the LIFO slot disabled. + self.reset_lifo_enabled(&mut core); + while !core.is_shutdown { + self.assert_lifo_enabled_is_correct(&core); + // Increment the tick core.tick(); @@ -463,6 +480,8 @@ impl Context { // another idle worker to try to steal work. core.transition_from_searching(&self.worker); + self.assert_lifo_enabled_is_correct(&core); + // Make the core available to the runtime context core.metrics.start_poll(); *self.core.borrow_mut() = Some(core); @@ -470,6 +489,7 @@ impl Context { // Run the task coop::budget(|| { task.run(); + let mut lifo_polls = 0; // As long as there is budget remaining and a task exists in the // `lifo_slot`, then keep running. @@ -478,7 +498,12 @@ impl Context { // by another worker. let mut core = match self.core.borrow_mut().take() { Some(core) => core, - None => return Err(()), + None => { + // In this case, we cannot call `reset_lifo_enabled()` + // because the core was stolen. The stealer will handle + // that at the top of `Context::run` + return Err(()); + } }; // If task poll times is enabled, measure the poll time. Note @@ -491,22 +516,13 @@ impl Context { // Check for a task in the LIFO slot let task = match core.lifo_slot.take() { Some(task) => task, - None => return Ok(core), + None => { + self.reset_lifo_enabled(&mut core); + return Ok(core); + } }; - // Polling a task doesn't necessarily consume any budget, if it - // doesn't use any Tokio leaf futures. To prevent such tasks - // from using the lifo slot in an infinite loop, we consume an - // extra unit of budget between each iteration of the loop. - coop::consume_one(); - - if coop::has_budget_remaining() { - // Run the LIFO task, then loop - core.metrics.start_poll(); - *self.core.borrow_mut() = Some(core); - let task = self.worker.handle.shared.owned.assert_owner(task); - task.run(); - } else { + if !coop::has_budget_remaining() { // Not enough budget left to run the LIFO task, push it to // the back of the queue and return. core.run_queue.push_back_or_overflow( @@ -514,12 +530,48 @@ impl Context { self.worker.inject(), &mut core.metrics, ); + // If we hit this point, the LIFO slot should be enabled. + // There is no need to reset it. + debug_assert!(core.lifo_enabled); return Ok(core); } + + // Track that we are about to run a task from the LIFO slot. + lifo_polls += 1; + super::counters::inc_lifo_schedules(); + + // Disable the LIFO slot if we reach our limit + // + // In ping-ping style workloads where task A notifies task B, + // which notifies task A again, continuously prioritizing the + // LIFO slot can cause starvation as these two tasks will + // repeatedly schedule the other. To mitigate this, we limit the + // number of times the LIFO slot is prioritized. + if lifo_polls >= MAX_LIFO_POLLS_PER_TICK { + core.lifo_enabled = false; + super::counters::inc_lifo_capped(); + } + + // Run the LIFO task, then loop + core.metrics.start_poll(); + *self.core.borrow_mut() = Some(core); + let task = self.worker.handle.shared.owned.assert_owner(task); + task.run(); } }) } + fn reset_lifo_enabled(&self, core: &mut Core) { + core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot; + } + + fn assert_lifo_enabled_is_correct(&self, core: &Core) { + debug_assert_eq!( + core.lifo_enabled, + !self.worker.handle.shared.config.disable_lifo_slot + ); + } + fn maintenance(&self, mut core: Box) -> Box { if core.tick % self.worker.handle.shared.config.event_interval == 0 { super::counters::inc_num_maintenance(); @@ -573,6 +625,8 @@ impl Context { } fn park_timeout(&self, mut core: Box, duration: Option) -> Box { + self.assert_lifo_enabled_is_correct(&core); + // Take the parker out of core let mut park = core.park.take().expect("park missing"); @@ -840,7 +894,7 @@ impl Handle { // task must always be pushed to the back of the queue, enabling other // tasks to be executed. If **not** a yield, then there is more // flexibility and the task may go to the front of the queue. - let should_notify = if is_yield || self.shared.config.disable_lifo_slot { + let should_notify = if is_yield || !core.lifo_enabled { core.run_queue .push_back_or_overflow(task, &self.shared.inject, &mut core.metrics); true diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index c63285aad49..b12a76e268e 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -60,6 +60,10 @@ cfg_loom! { mod loom_shutdown_join; mod loom_join_set; mod loom_yield; + + // Make sure debug assertions are enabled + #[cfg(not(debug_assertions))] + compiler_error!("these tests require debug assertions to be enabled"); } cfg_not_loom! {