Skip to content

Commit

Permalink
runtime: improvements for global_queue_interval (#6445)
Browse files Browse the repository at this point in the history
  • Loading branch information
mox692 authored Apr 3, 2024
1 parent e9ae5d4 commit 328a02c
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 8 deletions.
6 changes: 6 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,10 @@ impl Builder {
///
/// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
///
/// # Panics
///
/// This function will panic if 0 is passed as an argument.
///
/// # Examples
///
/// ```
Expand All @@ -768,7 +772,9 @@ impl Builder {
/// .build();
/// # }
/// ```
#[track_caller]
pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
assert!(val > 0, "global_queue_interval must be greater than 0");
self.global_queue_interval = Some(val);
self
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Stats {
let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;

cmp::max(
// We don't want to return less than 2 as that would result in the
// If we are using self-tuning, we don't want to return less than 2 as that would result in the
// global queue always getting checked first.
2,
cmp::min(
Expand Down
2 changes: 0 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,8 +985,6 @@ impl Core {
.stats
.tuned_global_queue_interval(&worker.handle.shared.config);

debug_assert!(next > 1);

// Smooth out jitter
if abs_diff(self.global_queue_interval, next) > 2 {
self.global_queue_interval = next;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Stats {
let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;

cmp::max(
// We don't want to return less than 2 as that would result in the
// If we are using self-tuning, we don't want to return less than 2 as that would result in the
// global queue always getting checked first.
2,
cmp::min(
Expand Down
3 changes: 0 additions & 3 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,6 @@ impl Worker {
/// Ensure core's state is set correctly for the worker to start using.
fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) {
self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config);
debug_assert!(self.global_queue_interval > 1);

// Reset `lifo_enabled` here in case the core was previously stolen from
// a task that had the LIFO slot disabled.
Expand Down Expand Up @@ -1288,8 +1287,6 @@ impl Worker {
fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) {
let next = core.stats.tuned_global_queue_interval(&cx.shared().config);

debug_assert!(next > 1);

// Smooth out jitter
if abs_diff(self.global_queue_interval, next) > 2 {
self.global_queue_interval = next;
Expand Down
12 changes: 12 additions & 0 deletions tokio/tests/rt_panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ fn builder_max_blocking_threads_panic_caller() -> Result<(), Box<dyn Error>> {
Ok(())
}

#[test]
fn builder_global_queue_interval_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let _ = Builder::new_multi_thread().global_queue_interval(0).build();
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

fn current_thread() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand Down
30 changes: 29 additions & 1 deletion tokio/tests/rt_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use tokio_test::{assert_err, assert_ok};
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::task::{Context, Poll, Waker};

Expand Down Expand Up @@ -486,6 +486,34 @@ fn max_blocking_threads_set_to_zero() {
.unwrap();
}

/// Regression test for #6445.
///
/// After #6445, setting `global_queue_interval` to 1 is now technically valid.
/// This test confirms that there is no regression in `multi_thread_runtime`
/// when global_queue_interval is set to 1.
#[test]
fn global_queue_interval_set_to_one() {
let rt = tokio::runtime::Builder::new_multi_thread()
.global_queue_interval(1)
.build()
.unwrap();

// Perform a simple work.
let cnt = Arc::new(AtomicUsize::new(0));
rt.block_on(async {
let mut set = tokio::task::JoinSet::new();
for _ in 0..10 {
let cnt = cnt.clone();
set.spawn(async move { cnt.fetch_add(1, Ordering::Relaxed) });
}

while let Some(res) = set.join_next().await {
res.unwrap();
}
});
assert_eq!(cnt.load(Relaxed), 10);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hang_on_shutdown() {
let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
Expand Down
27 changes: 27 additions & 0 deletions tokio/tests/rt_threaded_alt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,33 @@ fn max_blocking_threads_set_to_zero() {
.unwrap();
}

/// Regression test for #6445.
///
/// After #6445, setting `global_queue_interval` to 1 is now technically valid.
/// This test confirms that there is no regression in `multi_thread_runtime`
/// when global_queue_interval is set to 1.
#[test]
fn global_queue_interval_set_to_one() {
let rt = tokio::runtime::Builder::new_multi_thread_alt()
.global_queue_interval(1)
.build()
.unwrap();

// Perform a simple work.
let cnt = Arc::new(AtomicUsize::new(0));
rt.block_on(async {
let mut set = tokio::task::JoinSet::new();
for _ in 0..10 {
let cnt = cnt.clone();
set.spawn(async move { cnt.fetch_add(1, Relaxed) });
}
while let Some(res) = set.join_next().await {
res.unwrap();
}
});
assert_eq!(cnt.load(Relaxed), 10);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hang_on_shutdown() {
let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
Expand Down

0 comments on commit 328a02c

Please sign in to comment.