diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 08ae88a0dfe..9701134c6ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,6 +60,7 @@ jobs: - wasm32-wasi - check-external-types - check-fuzzing + - check-unstable-mt-counters steps: - run: exit 0 @@ -233,6 +234,31 @@ jobs: # the unstable cfg to RustDoc RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_taskdump + check-unstable-mt-counters: + name: check tokio full --internal-mt-counters + needs: basics + runs-on: ${{ matrix.os }} + strategy: + matrix: + include: + - os: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install Rust ${{ env.rust_stable }} + uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ env.rust_stable }} + - uses: Swatinem/rust-cache@v2 + # Run `tokio` with "unstable" and "taskdump" cfg flags. + - name: check tokio full --cfg unstable --cfg internal-mt-counters + run: cargo test --all-features + working-directory: tokio + env: + RUSTFLAGS: --cfg tokio_unstable --cfg tokio_internal_mt_counters -Dwarnings + # in order to run doctests for unstable features, we must also pass + # the unstable cfg to RustDoc + RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_internal_mt_counters + miri: name: miri needs: basics diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index 3fa179dd73c..a51829927de 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -48,7 +48,7 @@ jobs: run: cargo test --lib --release --features full -- --nocapture $SCOPE working-directory: tokio env: - RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings + RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings -C debug-assertions LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }} LOOM_MAX_BRANCHES: 10000 SCOPE: ${{ matrix.scope }} diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index b1177c2a4b7..f3ed17cff1d 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -119,17 +119,6 @@ cfg_rt_multi_thread! { pub(crate) fn set(budget: Budget) { let _ = context::budget(|cell| cell.set(budget)); } - - /// Consume one unit of progress from the current task's budget. - pub(crate) fn consume_one() { - let _ = context::budget(|cell| { - let mut budget = cell.get(); - if let Some(ref mut counter) = budget.0 { - *counter = counter.saturating_sub(1); - } - cell.set(budget); - }); - } } cfg_rt! { diff --git a/tokio/src/runtime/scheduler/multi_thread/counters.rs b/tokio/src/runtime/scheduler/multi_thread/counters.rs index bfc7d6da210..50bcc11985f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/counters.rs +++ b/tokio/src/runtime/scheduler/multi_thread/counters.rs @@ -6,17 +6,23 @@ mod imp { static NUM_MAINTENANCE: AtomicUsize = AtomicUsize::new(0); static NUM_NOTIFY_LOCAL: AtomicUsize = AtomicUsize::new(0); static NUM_UNPARKS_LOCAL: AtomicUsize = AtomicUsize::new(0); + static NUM_LIFO_SCHEDULES: AtomicUsize = AtomicUsize::new(0); + static NUM_LIFO_CAPPED: AtomicUsize = AtomicUsize::new(0); impl Drop for super::Counters { fn drop(&mut self) { let notifies_local = NUM_NOTIFY_LOCAL.load(Relaxed); let unparks_local = NUM_UNPARKS_LOCAL.load(Relaxed); let maintenance = NUM_MAINTENANCE.load(Relaxed); + let lifo_scheds = NUM_LIFO_SCHEDULES.load(Relaxed); + let lifo_capped = NUM_LIFO_CAPPED.load(Relaxed); println!("---"); println!("notifies (local): {}", notifies_local); println!(" unparks (local): {}", unparks_local); println!(" maintenance: {}", maintenance); + println!(" LIFO schedules: {}", lifo_scheds); + println!(" LIFO capped: {}", lifo_capped); } } @@ -31,6 +37,14 @@ mod imp { pub(crate) fn inc_num_maintenance() { NUM_MAINTENANCE.fetch_add(1, Relaxed); } + + pub(crate) fn inc_lifo_schedules() { + NUM_LIFO_SCHEDULES.fetch_add(1, Relaxed); + } + + pub(crate) fn inc_lifo_capped() { + NUM_LIFO_CAPPED.fetch_add(1, Relaxed); + } } #[cfg(not(tokio_internal_mt_counters))] @@ -38,6 +52,8 @@ mod imp { pub(crate) fn inc_num_inc_notify_local() {} pub(crate) fn inc_num_unparks_local() {} pub(crate) fn inc_num_maintenance() {} + pub(crate) fn inc_lifo_schedules() {} + pub(crate) fn inc_lifo_capped() {} } #[derive(Debug)] diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 4729376fc8f..e2bbb643db7 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -90,10 +90,14 @@ struct Core { /// When a task is scheduled from a worker, it is stored in this slot. The /// worker will check this slot for a task **before** checking the run /// queue. This effectively results in the **last** scheduled task to be run - /// next (LIFO). This is an optimization for message passing patterns and - /// helps to reduce latency. + /// next (LIFO). This is an optimization for improving locality which + /// benefits message passing patterns and helps to reduce latency. lifo_slot: Option, + /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`, + /// they go to the back of the `run_queue`. + lifo_enabled: bool, + /// The worker-local run queue. run_queue: queue::Local>, @@ -191,6 +195,12 @@ type Notified = task::Notified>; // Tracks thread-local state scoped_thread_local!(static CURRENT: Context); +/// Value picked out of thin-air. Running the LIFO slot a handful of times +/// seemms sufficient to benefit from locality. More than 3 times probably is +/// overweighing. The value can be tuned in the future with data that shows +/// improvements. +const MAX_LIFO_POLLS_PER_TICK: usize = 3; + pub(super) fn create( size: usize, park: Parker, @@ -214,6 +224,7 @@ pub(super) fn create( cores.push(Box::new(Core { tick: 0, lifo_slot: None, + lifo_enabled: !config.disable_lifo_slot, run_queue, is_searching: false, is_shutdown: false, @@ -422,7 +433,13 @@ fn run(worker: Arc) { impl Context { fn run(&self, mut core: Box) -> RunResult { + // Reset `lifo_enabled` here in case the core was previously stolen from + // a task that had the LIFO slot disabled. + self.reset_lifo_enabled(&mut core); + while !core.is_shutdown { + self.assert_lifo_enabled_is_correct(&core); + // Increment the tick core.tick(); @@ -463,6 +480,8 @@ impl Context { // another idle worker to try to steal work. core.transition_from_searching(&self.worker); + self.assert_lifo_enabled_is_correct(&core); + // Make the core available to the runtime context core.metrics.start_poll(); *self.core.borrow_mut() = Some(core); @@ -470,6 +489,7 @@ impl Context { // Run the task coop::budget(|| { task.run(); + let mut lifo_polls = 0; // As long as there is budget remaining and a task exists in the // `lifo_slot`, then keep running. @@ -478,7 +498,12 @@ impl Context { // by another worker. let mut core = match self.core.borrow_mut().take() { Some(core) => core, - None => return Err(()), + None => { + // In this case, we cannot call `reset_lifo_enabled()` + // because the core was stolen. The stealer will handle + // that at the top of `Context::run` + return Err(()); + } }; // If task poll times is enabled, measure the poll time. Note @@ -491,22 +516,13 @@ impl Context { // Check for a task in the LIFO slot let task = match core.lifo_slot.take() { Some(task) => task, - None => return Ok(core), + None => { + self.reset_lifo_enabled(&mut core); + return Ok(core); + } }; - // Polling a task doesn't necessarily consume any budget, if it - // doesn't use any Tokio leaf futures. To prevent such tasks - // from using the lifo slot in an infinite loop, we consume an - // extra unit of budget between each iteration of the loop. - coop::consume_one(); - - if coop::has_budget_remaining() { - // Run the LIFO task, then loop - core.metrics.start_poll(); - *self.core.borrow_mut() = Some(core); - let task = self.worker.handle.shared.owned.assert_owner(task); - task.run(); - } else { + if !coop::has_budget_remaining() { // Not enough budget left to run the LIFO task, push it to // the back of the queue and return. core.run_queue.push_back_or_overflow( @@ -514,12 +530,48 @@ impl Context { self.worker.inject(), &mut core.metrics, ); + // If we hit this point, the LIFO slot should be enabled. + // There is no need to reset it. + debug_assert!(core.lifo_enabled); return Ok(core); } + + // Track that we are about to run a task from the LIFO slot. + lifo_polls += 1; + super::counters::inc_lifo_schedules(); + + // Disable the LIFO slot if we reach our limit + // + // In ping-ping style workloads where task A notifies task B, + // which notifies task A again, continuously prioritizing the + // LIFO slot can cause starvation as these two tasks will + // repeatedly schedule the other. To mitigate this, we limit the + // number of times the LIFO slot is prioritized. + if lifo_polls >= MAX_LIFO_POLLS_PER_TICK { + core.lifo_enabled = false; + super::counters::inc_lifo_capped(); + } + + // Run the LIFO task, then loop + core.metrics.start_poll(); + *self.core.borrow_mut() = Some(core); + let task = self.worker.handle.shared.owned.assert_owner(task); + task.run(); } }) } + fn reset_lifo_enabled(&self, core: &mut Core) { + core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot; + } + + fn assert_lifo_enabled_is_correct(&self, core: &Core) { + debug_assert_eq!( + core.lifo_enabled, + !self.worker.handle.shared.config.disable_lifo_slot + ); + } + fn maintenance(&self, mut core: Box) -> Box { if core.tick % self.worker.handle.shared.config.event_interval == 0 { super::counters::inc_num_maintenance(); @@ -573,6 +625,8 @@ impl Context { } fn park_timeout(&self, mut core: Box, duration: Option) -> Box { + self.assert_lifo_enabled_is_correct(&core); + // Take the parker out of core let mut park = core.park.take().expect("park missing"); @@ -840,7 +894,7 @@ impl Handle { // task must always be pushed to the back of the queue, enabling other // tasks to be executed. If **not** a yield, then there is more // flexibility and the task may go to the front of the queue. - let should_notify = if is_yield || self.shared.config.disable_lifo_slot { + let should_notify = if is_yield || !core.lifo_enabled { core.run_queue .push_back_or_overflow(task, &self.shared.inject, &mut core.metrics); true diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index c63285aad49..b12a76e268e 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -60,6 +60,10 @@ cfg_loom! { mod loom_shutdown_join; mod loom_join_set; mod loom_yield; + + // Make sure debug assertions are enabled + #[cfg(not(debug_assertions))] + compiler_error!("these tests require debug assertions to be enabled"); } cfg_not_loom! {