Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Add ability to disable paused clock auto-advance #4523

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions tokio-macros/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,38 @@ impl RuntimeFlavor {
}
}

#[derive(Clone, Copy)]
enum StartPaused {
No,
Paused,
PausedNoAdvance,
}

impl StartPaused {
fn from_bool(b: bool) -> Self {
if b {
StartPaused::Paused
} else {
StartPaused::No
}
}

fn from_str(s: &str) -> Result<StartPaused, String> {
match s {
"paused" => Ok(StartPaused::Paused),
"noadvance" => Ok(StartPaused::PausedNoAdvance),
_ => Err(format!(
"No such paused mode `{}`. The paused modes are `paused` and `noadvance`.",
s
)),
}
}
}

struct FinalConfig {
flavor: RuntimeFlavor,
worker_threads: Option<usize>,
start_paused: Option<bool>,
start_paused: Option<StartPaused>,
}

/// Config used in case of the attribute not being able to build a valid config
Expand All @@ -43,7 +71,7 @@ struct Configuration {
default_flavor: RuntimeFlavor,
flavor: Option<RuntimeFlavor>,
worker_threads: Option<(usize, Span)>,
start_paused: Option<(bool, Span)>,
start_paused: Option<(StartPaused, Span)>,
is_test: bool,
}

Expand Down Expand Up @@ -99,7 +127,13 @@ impl Configuration {
return Err(syn::Error::new(span, "`start_paused` set multiple times."));
}

let start_paused = parse_bool(start_paused, span, "start_paused")?;
let start_paused = match parse_bool(start_paused.clone(), span, "start_paused") {
Ok(b) => StartPaused::from_bool(b),
_ => {
let s = parse_string(start_paused, span, "start_paused")?;
StartPaused::from_str(&s).map_err(|err| syn::Error::new(span, err))?
}
};
self.start_paused = Some((start_paused, span));
Ok(())
}
Expand Down Expand Up @@ -325,7 +359,11 @@ fn parse_knobs(mut input: syn::ItemFn, is_test: bool, config: FinalConfig) -> To
rt = quote! { #rt.worker_threads(#v) };
}
if let Some(v) = config.start_paused {
rt = quote! { #rt.start_paused(#v) };
match v {
StartPaused::No => rt = quote! { #rt.start_paused(false) },
StartPaused::Paused => rt = quote! { #rt.start_paused(true) },
StartPaused::PausedNoAdvance => rt = quote! { #rt.start_paused_no_advance(true) },
}
}

let header = if is_test {
Expand Down
20 changes: 17 additions & 3 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct Builder {
enable_time: bool,

/// Whether or not the clock should start paused.
start_paused: bool,
start_paused: driver::PauseMode,

/// The number of worker threads, used by Runtime.
///
Expand Down Expand Up @@ -125,7 +125,7 @@ impl Builder {
enable_time: false,

// The clock starts not-paused
start_paused: false,
start_paused: driver::PauseMode::Unpaused,

// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,
Expand Down Expand Up @@ -652,7 +652,21 @@ cfg_test_util! {
/// .unwrap();
/// ```
pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
self.start_paused = start_paused;
if start_paused {
self.start_paused = driver::PauseMode::Paused;
} else {
self.start_paused = driver::PauseMode::Unpaused;
}
self
}

/// Controls if the runtime's clock starts paused without auto-advance or advancing.
pub fn start_paused_no_advance(&mut self, start_paused_no_advance: bool) -> &mut Self {
if start_paused_no_advance {
self.start_paused = driver::PauseMode::PausedNoAdvance;
} else {
self.start_paused = driver::PauseMode::Unpaused;
}
self
}
}
Expand Down
23 changes: 20 additions & 3 deletions tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,16 @@ cfg_time! {
pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;

fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
crate::time::Clock::new(enable_pausing, start_paused)
#[allow(unused_variables)]
fn create_clock(enable_pausing: bool, start_paused: PauseMode) -> Clock {
let clock = crate::time::Clock::new(enable_pausing);
#[cfg(feature = "test-util")]
match start_paused {
PauseMode::Paused => clock.pause(),
PauseMode::PausedNoAdvance => clock.pause_no_advance(),
_ => ()
}
clock
}

fn create_time_driver(
Expand Down Expand Up @@ -158,11 +166,20 @@ pub(crate) struct Resources {
pub(crate) clock: Clock,
}

#[derive(Copy, Clone)]
pub(crate) enum PauseMode {
Unpaused,
#[cfg(feature = "test-util")]
Paused,
#[cfg(feature = "test-util")]
PausedNoAdvance,
}

pub(crate) struct Cfg {
pub(crate) enable_io: bool,
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
pub(crate) start_paused: bool,
pub(crate) start_paused: PauseMode,
}

impl Driver {
Expand Down
91 changes: 68 additions & 23 deletions tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cfg_not_test_util! {
}

impl Clock {
pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock {
pub(crate) fn new(_enable_pausing: bool) -> Clock {
Clock {}
}

Expand Down Expand Up @@ -49,6 +49,31 @@ cfg_test_util! {
inner: Arc<Mutex<Inner>>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
enum Frozen {
Thawed(std::time::Instant),
#[allow(clippy::enum_variant_names)]
Frozen,
NoAdvance,
}

impl Frozen {
fn thawed(&self) -> Option<&std::time::Instant> {
match self {
Frozen::Thawed(ref i) => Some(i),
_ => None,
}
}

fn is_frozen(&self) -> bool {
!matches!(self, Frozen::Thawed(_))
}

fn is_frozen_no_advance(&self) -> bool {
matches!(self, Frozen::NoAdvance)
}
}

#[derive(Debug)]
struct Inner {
/// True if the ability to pause time is enabled.
Expand All @@ -57,8 +82,8 @@ cfg_test_util! {
/// Instant to use as the clock's base instant.
base: std::time::Instant,

/// Instant at which the clock was last unfrozen.
unfrozen: Option<std::time::Instant>,
/// Instant at which the clock was last thawed or freeze state.
frozen: Frozen,
}

/// Pauses time.
Expand Down Expand Up @@ -101,6 +126,12 @@ cfg_test_util! {
clock.pause();
}

/// Pauses time and disabled auto advancing
pub fn pause_no_advance() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.pause_no_advance();
}

/// Resumes time.
///
/// Clears the saved `Instant::now()` value. Subsequent calls to
Expand All @@ -114,11 +145,11 @@ cfg_test_util! {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut inner = clock.inner.lock();

if inner.unfrozen.is_some() {
if !inner.frozen.is_frozen() {
panic!("time is not frozen");
}

inner.unfrozen = Some(std::time::Instant::now());
inner.frozen = Frozen::Thawed(std::time::Instant::now());
}

/// Advances time.
Expand Down Expand Up @@ -171,47 +202,61 @@ cfg_test_util! {
impl Clock {
/// Returns a new `Clock` instance that uses the current execution context's
/// source of time.
pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock {
pub(crate) fn new(enable_pausing: bool) -> Clock {
let now = std::time::Instant::now();

let clock = Clock {
Clock {
inner: Arc::new(Mutex::new(Inner {
enable_pausing,
base: now,
unfrozen: Some(now),
frozen: Frozen::Thawed(now),
})),
};

if start_paused {
clock.pause();
}

clock
}

pub(crate) fn pause(&self) {
fn freeze(&self, frozen: Frozen) {
let mut inner = self.inner.lock();

if !inner.enable_pausing {
drop(inner); // avoid poisoning the lock
panic!("`time::pause()` requires the `current_thread` Tokio runtime. \
panic!("`time::pause()` or `time::pause_no_advance()` requires the `current_thread` Tokio runtime. \
This is the default Runtime used by `#[tokio::test].");
}

let elapsed = inner.unfrozen.as_ref().expect("time is already frozen").elapsed();
inner.base += elapsed;
inner.unfrozen = None;
match (&inner.frozen, &frozen) {
(Frozen::Thawed(t), _) => {
let elapsed = t.elapsed();
inner.base += elapsed;
},
(a, b) if a != b => (),
_ => panic!("time is already frozen"),
}

inner.frozen = frozen;
}

pub(crate) fn pause(&self) {
self.freeze(Frozen::Frozen);
}

pub(crate) fn pause_no_advance(&self) {
self.freeze(Frozen::NoAdvance);
}

pub(crate) fn is_paused(&self) -> bool {
let inner = self.inner.lock();
inner.unfrozen.is_none()
inner.frozen.is_frozen()
}

pub(crate) fn is_paused_no_advance(&self) -> bool {
let inner = self.inner.lock();
inner.frozen.is_frozen_no_advance()
}

pub(crate) fn advance(&self, duration: Duration) {
let mut inner = self.inner.lock();

if inner.unfrozen.is_some() {
if !inner.frozen.is_frozen() {
panic!("time is not frozen");
}

Expand All @@ -223,8 +268,8 @@ cfg_test_util! {

let mut ret = inner.base;

if let Some(unfrozen) = inner.unfrozen {
ret += unfrozen.elapsed();
if let Some(thawed) = inner.frozen.thawed() {
ret += thawed.elapsed();
}

Instant::from_std(ret)
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,10 @@ where
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
let clock = &self.time_source.clock;

if clock.is_paused() {
if clock.is_paused_no_advance() {
// As autoadvance is disabled the next timer won't ever be reached
self.park.park_timeout(Duration::from_secs(3600))?;
} else if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;

// If the time driver was woken, then the park completed
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/time/driver/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) {
#[test]
fn single_timer() {
model(|| {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::clock::Clock::new(true);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -77,7 +77,7 @@ fn single_timer() {
#[test]
fn drop_timer() {
model(|| {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::clock::Clock::new(true);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -108,7 +108,7 @@ fn drop_timer() {
#[test]
fn change_waker() {
model(|| {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::clock::Clock::new(true);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -143,7 +143,7 @@ fn reset_future() {
model(|| {
let finished_early = Arc::new(AtomicBool::new(false));

let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::clock::Clock::new(true);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -199,7 +199,7 @@ fn normal_or_miri<T>(normal: T, miri: T) -> T {
#[test]
#[cfg(not(loom))]
fn poll_process_levels() {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::clock::Clock::new(true);
clock.pause();

let time_source = super::ClockTime::new(clock.clone());
Expand Down Expand Up @@ -240,7 +240,7 @@ fn poll_process_levels() {
fn poll_process_levels_targeted() {
let mut context = Context::from_waker(noop_waker_ref());

let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::clock::Clock::new(true);
clock.pause();

let time_source = super::ClockTime::new(clock.clone());
Expand Down
Loading