diff --git a/tokio-timer/Cargo.toml b/tokio-timer/Cargo.toml index 32ef2e88bff..d0c4970394d 100644 --- a/tokio-timer/Cargo.toml +++ b/tokio-timer/Cargo.toml @@ -18,6 +18,7 @@ Timer facilities for Tokio [dependencies] futures = "0.1.19" tokio-executor = { version = "0.1.1", path = "../tokio-executor" } +crossbeam-utils = "0.5.0" # Backs `DelayQueue` slab = "0.4.1" diff --git a/tokio-timer/src/delay.rs b/tokio-timer/src/delay.rs index 104fecff076..5d84dcbd8af 100644 --- a/tokio-timer/src/delay.rs +++ b/tokio-timer/src/delay.rs @@ -1,5 +1,5 @@ use Error; -use timer::Registration; +use timer::{Registration, HandlePriv}; use futures::{Future, Poll}; @@ -16,19 +16,10 @@ use std::time::Instant; /// [`new`]: #method.new #[derive(Debug)] pub struct Delay { - /// The instant at which the future completes. - deadline: Instant, - /// The link between the `Delay` instance at the timer that drives it. /// - /// When `Delay` is created with `new`, this is initialized to `None` and is - /// lazily set in `poll`. When `poll` is called, the default for the current - /// execution context is used (obtained via `Handle::current`). - /// - /// When `delay` is created with `new_with_registration`, the value is set. - /// - /// Once `registration` is set to `Some`, it is never changed. - registration: Option, + /// This also stores the `deadline` value. + registration: Registration, } impl Delay { @@ -38,34 +29,28 @@ impl Delay { /// as to how the sub-millisecond portion of `deadline` will be handled. /// `Delay` should not be used for high-resolution timer use cases. pub fn new(deadline: Instant) -> Delay { - Delay { - deadline, - registration: None, - } + let registration = Registration::new(deadline); + + Delay { registration } } - pub(crate) fn new_with_registration( - deadline: Instant, - registration: Registration) -> Delay - { - Delay { - deadline, - registration: Some(registration), - } + pub(crate) fn new_with_handle(deadline: Instant, handle: HandlePriv) -> Delay { + let mut registration = Registration::new(deadline); + registration.register_with(handle); + + Delay { registration } } /// Returns the instant at which the future will complete. pub fn deadline(&self) -> Instant { - self.deadline + self.registration.deadline() } /// Returns true if the `Delay` has elapsed /// /// A `Delay` is elapsed when the requested duration has elapsed. pub fn is_elapsed(&self) -> bool { - self.registration.as_ref() - .map(|r| r.is_elapsed()) - .unwrap_or(false) + self.registration.is_elapsed() } /// Reset the `Delay` instance to a new deadline. @@ -76,21 +61,13 @@ impl Delay { /// This function can be called both before and after the future has /// completed. pub fn reset(&mut self, deadline: Instant) { - self.deadline = deadline; - - if let Some(registration) = self.registration.as_ref() { - registration.reset(deadline); - } + self.registration.reset(deadline); } /// Register the delay with the timer instance for the current execution /// context. fn register(&mut self) { - if self.registration.is_some() { - return; - } - - self.registration = Some(Registration::new(self.deadline)); + self.registration.register(); } } @@ -102,7 +79,6 @@ impl Future for Delay { // Ensure the `Delay` instance is associated with a timer. self.register(); - self.registration.as_ref().unwrap() - .poll_elapsed() + self.registration.poll_elapsed() } } diff --git a/tokio-timer/src/lib.rs b/tokio-timer/src/lib.rs index b07c5c6d7b9..ee84073d2fb 100644 --- a/tokio-timer/src/lib.rs +++ b/tokio-timer/src/lib.rs @@ -26,6 +26,7 @@ extern crate tokio_executor; +extern crate crossbeam_utils; #[macro_use] extern crate futures; extern crate slab; diff --git a/tokio-timer/src/timer/entry.rs b/tokio-timer/src/timer/entry.rs index 7b2bac4959f..7830b777686 100644 --- a/tokio-timer/src/timer/entry.rs +++ b/tokio-timer/src/timer/entry.rs @@ -2,6 +2,7 @@ use Error; use atomic::AtomicU64; use timer::{HandlePriv, Inner}; +use crossbeam_utils::CachePadded; use futures::Poll; use futures::task::AtomicTask; @@ -9,7 +10,7 @@ use std::cell::UnsafeCell; use std::ptr; use std::sync::{Arc, Weak}; use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::Ordering::{SeqCst, Relaxed}; use std::time::Instant; use std::u64; @@ -26,12 +27,14 @@ use std::u64; /// processed during that timer tick. #[derive(Debug)] pub(crate) struct Entry { + /// Only accessed from `Registration`. + time: CachePadded>, + /// Timer internals. Using a weak pointer allows the timer to shutdown /// without all `Delay` instances having completed. - inner: Weak, - - /// Task to notify once the deadline is reached. - task: AtomicTask, + /// + /// When `None`, the entry has not yet been linked with a timer instance. + inner: Option>, /// Tracks the entry state. This value contains the following information: /// @@ -44,22 +47,19 @@ pub(crate) struct Entry { /// instant, this value is changed. state: AtomicU64, - /// When true, the entry is counted by `Inner` towards the max outstanding - /// timeouts. The drop fn uses this to know if it should decrement the - /// counter. - /// - /// One might think that it would be easier to just not create the `Entry`. - /// The problem is that `Delay` expects creating a `Registration` to always - /// return a `Registration` instance. This simplifying factor allows it to - /// improve the struct layout. To do this, we must always allocate the node. - counted: bool, + /// Task to notify once the deadline is reached. + task: AtomicTask, /// True when the entry is queued in the "process" stack. This value /// is set before pushing the value and unset after popping the value. + /// + /// TODO: This could possibly be rolled up into `state`. pub(super) queued: AtomicBool, /// Next entry in the "process" linked list. /// + /// Access to this field is coordinated by the `queued` flag. + /// /// Represents a strong Arc ref. pub(super) next_atomic: UnsafeCell<*mut Entry>, @@ -91,6 +91,12 @@ pub(crate) struct Entry { pub(super) prev_stack: UnsafeCell<*const Entry>, } +/// Stores the info for `Delay`. +#[derive(Debug)] +pub(crate) struct Time { + pub(crate) deadline: Instant, +} + /// Flag indicating a timer entry has elapsed const ELAPSED: u64 = 1 << 63; @@ -100,14 +106,14 @@ const ERROR: u64 = u64::MAX; // ===== impl Entry ===== impl Entry { - pub fn new(when: u64, handle: HandlePriv) -> Entry { - assert!(when > 0 && when < u64::MAX); - + pub fn new(deadline: Instant) -> Entry { Entry { - inner: handle.into_inner(), + time: CachePadded::new(UnsafeCell::new(Time { + deadline, + })), + inner: None, task: AtomicTask::new(), - state: AtomicU64::new(when), - counted: true, + state: AtomicU64::new(0), queued: AtomicBool::new(false), next_atomic: UnsafeCell::new(ptr::null_mut()), when: UnsafeCell::new(None), @@ -116,34 +122,89 @@ impl Entry { } } - pub fn new_elapsed(handle: HandlePriv) -> Entry { - Entry { - inner: handle.into_inner(), - task: AtomicTask::new(), - state: AtomicU64::new(ELAPSED), - counted: true, - queued: AtomicBool::new(false), - next_atomic: UnsafeCell::new(ptr::null_mut()), - when: UnsafeCell::new(None), - next_stack: UnsafeCell::new(None), - prev_stack: UnsafeCell::new(ptr::null_mut()), - } + /// Only called by `Registration` + pub fn time_ref(&self) -> &Time { + unsafe { &*self.time.get() } } - /// Create a new `Entry` that is in the error state. Calling `poll_elapsed` on - /// this `Entry` will always result in `Err` being returned. - pub fn new_error() -> Entry { - Entry { - inner: Weak::new(), - task: AtomicTask::new(), - state: AtomicU64::new(ERROR), - counted: false, - queued: AtomicBool::new(false), - next_atomic: UnsafeCell::new(ptr::null_mut()), - when: UnsafeCell::new(None), - next_stack: UnsafeCell::new(None), - prev_stack: UnsafeCell::new(ptr::null_mut()), + /// Only called by `Registration` + pub fn time_mut(&self) -> &mut Time { + unsafe { &mut *self.time.get() } + } + + /// Returns `true` if the `Entry` is currently associated with a timer + /// instance. + pub fn is_registered(&self) -> bool { + self.inner.is_some() + } + + /// Only called by `Registration` + pub fn register(me: &mut Arc) { + let handle = match HandlePriv::try_current() { + Ok(handle) => handle, + Err(_) => { + // Could not associate the entry with a timer, transition the + // state to error + Arc::get_mut(me).unwrap() + .transition_to_error(); + + return; + } + }; + + Entry::register_with(me, handle) + } + + /// Only called by `Registration` + pub fn register_with(me: &mut Arc, handle: HandlePriv) { + assert!(!me.is_registered(), "only register an entry once"); + + let deadline = me.time_ref().deadline; + + let inner = match handle.inner() { + Some(inner) => inner, + None => { + // Could not associate the entry with a timer, transition the + // state to error + Arc::get_mut(me).unwrap() + .transition_to_error(); + + return; + } + }; + + // Increment the number of active timeouts + if inner.increment().is_err() { + Arc::get_mut(me).unwrap() + .transition_to_error(); + + return; + } + + // Associate the entry with the timer + Arc::get_mut(me).unwrap() + .inner = Some(handle.into_inner()); + + let when = inner.normalize_deadline(deadline); + + // Relaxed OK: At this point, there are no other threads that have + // access to this entry. + if when <= inner.elapsed() { + me.state.store(ELAPSED, Relaxed); + return; + } else { + me.state.store(when, Relaxed); } + + if inner.queue(me).is_err() { + // The timer has shutdown, transition the entry to the error state. + me.error(); + } + } + + fn transition_to_error(&mut self) { + self.inner = Some(Weak::new()); + self.state = AtomicU64::new(ERROR); } /// The current entry state as known by the timer. This is not the value of @@ -224,7 +285,8 @@ impl Entry { return; } - let inner = match entry.inner.upgrade() { + // If registered with a timer instance, try to upgrade the Arc. + let inner = match entry.upgrade_inner() { Some(inner) => inner, None => return, }; @@ -260,12 +322,18 @@ impl Entry { Ok(NotReady) } - pub fn reset(entry: &Arc, deadline: Instant) { - let inner = match entry.inner.upgrade() { + /// Only called by `Registration` + pub fn reset(entry: &mut Arc) { + if !entry.is_registered() { + return; + } + + let inner = match entry.upgrade_inner() { Some(inner) => inner, None => return, }; + let deadline = entry.time_ref().deadline; let when = inner.normalize_deadline(deadline); let elapsed = inner.elapsed(); @@ -305,6 +373,11 @@ impl Entry { let _ = inner.queue(entry); } } + + fn upgrade_inner(&self) -> Option> { + self.inner.as_ref() + .and_then(|inner| inner.upgrade()) + } } fn is_elapsed(state: u64) -> bool { @@ -313,11 +386,7 @@ fn is_elapsed(state: u64) -> bool { impl Drop for Entry { fn drop(&mut self) { - if !self.counted { - return; - } - - let inner = match self.inner.upgrade() { + let inner = match self.upgrade_inner() { Some(inner) => inner, None => return, }; diff --git a/tokio-timer/src/timer/handle.rs b/tokio-timer/src/timer/handle.rs index 99de611e1de..323e39e108c 100644 --- a/tokio-timer/src/timer/handle.rs +++ b/tokio-timer/src/timer/handle.rs @@ -1,5 +1,5 @@ use {Error, Delay, Deadline, Interval}; -use timer::{Registration, Inner}; +use timer::Inner; use tokio_executor::Enter; @@ -128,11 +128,7 @@ impl Handle { pub fn delay(&self, deadline: Instant) -> Delay { match self.inner { Some(ref handle_priv) => { - let registration = Registration::new_with_handle( - deadline, - handle_priv.clone()); - - Delay::new_with_registration(deadline, registration) + Delay::new_with_handle(deadline, handle_priv.clone()) } None => { Delay::new(deadline) diff --git a/tokio-timer/src/timer/mod.rs b/tokio-timer/src/timer/mod.rs index a1529213698..3be5ba3d1a9 100644 --- a/tokio-timer/src/timer/mod.rs +++ b/tokio-timer/src/timer/mod.rs @@ -41,9 +41,9 @@ mod stack; use self::atomic_stack::AtomicStack; use self::entry::Entry; use self::stack::Stack; -use self::handle::HandlePriv; pub use self::handle::{Handle, with_default}; +pub(crate) use self::handle::HandlePriv; pub use self::now::{Now, SystemNow}; pub(crate) use self::registration::Registration; diff --git a/tokio-timer/src/timer/registration.rs b/tokio-timer/src/timer/registration.rs index 3ff8a4c9281..3a1414950f6 100644 --- a/tokio-timer/src/timer/registration.rs +++ b/tokio-timer/src/timer/registration.rs @@ -20,50 +20,26 @@ impl Registration { fn is_send() {} is_send::(); - match HandlePriv::try_current() { - Ok(handle) => Registration::new_with_handle(deadline, handle), - Err(_) => Registration::new_error(), - } + Registration { entry: Arc::new(Entry::new(deadline)) } } - pub fn new_with_handle(deadline: Instant, handle: HandlePriv) -> Registration { - let inner = match handle.inner() { - Some(inner) => inner, - None => return Registration::new_error(), - }; - - // Increment the number of active timeouts - if inner.increment().is_err() { - return Registration::new_error(); - } - - let when = inner.normalize_deadline(deadline); - - if when <= inner.elapsed() { - // The deadline has already elapsed, there is no point creating the - // structures. - return Registration { - entry: Arc::new(Entry::new_elapsed(handle)), - }; - } - - let entry = Arc::new(Entry::new(when, handle)); + pub fn deadline(&self) -> Instant { + self.entry.time_ref().deadline + } - if inner.queue(&entry).is_err() { - // The timer has shutdown, transition the entry to the error state. - entry.error(); + pub fn register(&mut self) { + if !self.entry.is_registered() { + Entry::register(&mut self.entry) } - - Registration { entry } } - pub fn reset(&self, deadline: Instant) { - Entry::reset(&self.entry, deadline); + pub fn register_with(&mut self, handle: HandlePriv) { + Entry::register_with(&mut self.entry, handle) } - fn new_error() -> Registration { - let entry = Arc::new(Entry::new_error()); - Registration { entry } + pub fn reset(&mut self, deadline: Instant) { + self.entry.time_mut().deadline = deadline; + Entry::reset(&mut self.entry); } pub fn is_elapsed(&self) -> bool {