Skip to content

Commit

Permalink
timer: Reduce size of Delay struct (#554)
Browse files Browse the repository at this point in the history
* Remove `counted` field on `timer::Entry`.

It turns out that a better indicator of whether or not the number of
active timeouts should be decremented is if the `Entry` has been
associated with a timer. In other words, if `Entry::inner` can be
upgraded, then the count should be decremented on drop.

* timer: Tweak link between `Delay` and the driver

This tweaks the struct layout / details regarding how a `Delay` instance
is linked to a driver (timer instance). Instead of lazily allocating the
`Entry` (node shared between `Delay` and the timer), `Entry` is
allocated immediately when `Delay` is created. This allows using the
entry store data used by `Delay`.

This is in anticipation of further timer improvements that would
otherwise require the size of `Delay` to grow further. Since an
allocation is already made, the idea is to shrink the size of the
`Delay` struct.
  • Loading branch information
carllerche authored Aug 22, 2018
1 parent d822b72 commit cf184eb
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 136 deletions.
1 change: 1 addition & 0 deletions tokio-timer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Timer facilities for Tokio
[dependencies]
futures = "0.1.19"
tokio-executor = { version = "0.1.1", path = "../tokio-executor" }
crossbeam-utils = "0.5.0"

# Backs `DelayQueue`
slab = "0.4.1"
Expand Down
56 changes: 16 additions & 40 deletions tokio-timer/src/delay.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use Error;
use timer::Registration;
use timer::{Registration, HandlePriv};

use futures::{Future, Poll};

Expand All @@ -16,19 +16,10 @@ use std::time::Instant;
/// [`new`]: #method.new
#[derive(Debug)]
pub struct Delay {
/// The instant at which the future completes.
deadline: Instant,

/// The link between the `Delay` instance at the timer that drives it.
///
/// When `Delay` is created with `new`, this is initialized to `None` and is
/// lazily set in `poll`. When `poll` is called, the default for the current
/// execution context is used (obtained via `Handle::current`).
///
/// When `delay` is created with `new_with_registration`, the value is set.
///
/// Once `registration` is set to `Some`, it is never changed.
registration: Option<Registration>,
/// This also stores the `deadline` value.
registration: Registration,
}

impl Delay {
Expand All @@ -38,34 +29,28 @@ impl Delay {
/// as to how the sub-millisecond portion of `deadline` will be handled.
/// `Delay` should not be used for high-resolution timer use cases.
pub fn new(deadline: Instant) -> Delay {
Delay {
deadline,
registration: None,
}
let registration = Registration::new(deadline);

Delay { registration }
}

pub(crate) fn new_with_registration(
deadline: Instant,
registration: Registration) -> Delay
{
Delay {
deadline,
registration: Some(registration),
}
pub(crate) fn new_with_handle(deadline: Instant, handle: HandlePriv) -> Delay {
let mut registration = Registration::new(deadline);
registration.register_with(handle);

Delay { registration }
}

/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
self.deadline
self.registration.deadline()
}

/// Returns true if the `Delay` has elapsed
///
/// A `Delay` is elapsed when the requested duration has elapsed.
pub fn is_elapsed(&self) -> bool {
self.registration.as_ref()
.map(|r| r.is_elapsed())
.unwrap_or(false)
self.registration.is_elapsed()
}

/// Reset the `Delay` instance to a new deadline.
Expand All @@ -76,21 +61,13 @@ impl Delay {
/// This function can be called both before and after the future has
/// completed.
pub fn reset(&mut self, deadline: Instant) {
self.deadline = deadline;

if let Some(registration) = self.registration.as_ref() {
registration.reset(deadline);
}
self.registration.reset(deadline);
}

/// Register the delay with the timer instance for the current execution
/// context.
fn register(&mut self) {
if self.registration.is_some() {
return;
}

self.registration = Some(Registration::new(self.deadline));
self.registration.register();
}
}

Expand All @@ -102,7 +79,6 @@ impl Future for Delay {
// Ensure the `Delay` instance is associated with a timer.
self.register();

self.registration.as_ref().unwrap()
.poll_elapsed()
self.registration.poll_elapsed()
}
}
1 change: 1 addition & 0 deletions tokio-timer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

extern crate tokio_executor;

extern crate crossbeam_utils;
#[macro_use]
extern crate futures;
extern crate slab;
Expand Down
175 changes: 122 additions & 53 deletions tokio-timer/src/timer/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use Error;
use atomic::AtomicU64;
use timer::{HandlePriv, Inner};

use crossbeam_utils::CachePadded;
use futures::Poll;
use futures::task::AtomicTask;

use std::cell::UnsafeCell;
use std::ptr;
use std::sync::{Arc, Weak};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::Ordering::{SeqCst, Relaxed};
use std::time::Instant;
use std::u64;

Expand All @@ -26,12 +27,14 @@ use std::u64;
/// processed during that timer tick.
#[derive(Debug)]
pub(crate) struct Entry {
/// Only accessed from `Registration`.
time: CachePadded<UnsafeCell<Time>>,

/// Timer internals. Using a weak pointer allows the timer to shutdown
/// without all `Delay` instances having completed.
inner: Weak<Inner>,

/// Task to notify once the deadline is reached.
task: AtomicTask,
///
/// When `None`, the entry has not yet been linked with a timer instance.
inner: Option<Weak<Inner>>,

/// Tracks the entry state. This value contains the following information:
///
Expand All @@ -44,22 +47,19 @@ pub(crate) struct Entry {
/// instant, this value is changed.
state: AtomicU64,

/// When true, the entry is counted by `Inner` towards the max outstanding
/// timeouts. The drop fn uses this to know if it should decrement the
/// counter.
///
/// One might think that it would be easier to just not create the `Entry`.
/// The problem is that `Delay` expects creating a `Registration` to always
/// return a `Registration` instance. This simplifying factor allows it to
/// improve the struct layout. To do this, we must always allocate the node.
counted: bool,
/// Task to notify once the deadline is reached.
task: AtomicTask,

/// True when the entry is queued in the "process" stack. This value
/// is set before pushing the value and unset after popping the value.
///
/// TODO: This could possibly be rolled up into `state`.
pub(super) queued: AtomicBool,

/// Next entry in the "process" linked list.
///
/// Access to this field is coordinated by the `queued` flag.
///
/// Represents a strong Arc ref.
pub(super) next_atomic: UnsafeCell<*mut Entry>,

Expand Down Expand Up @@ -91,6 +91,12 @@ pub(crate) struct Entry {
pub(super) prev_stack: UnsafeCell<*const Entry>,
}

/// Stores the info for `Delay`.
#[derive(Debug)]
pub(crate) struct Time {
pub(crate) deadline: Instant,
}

/// Flag indicating a timer entry has elapsed
const ELAPSED: u64 = 1 << 63;

Expand All @@ -100,14 +106,14 @@ const ERROR: u64 = u64::MAX;
// ===== impl Entry =====

impl Entry {
pub fn new(when: u64, handle: HandlePriv) -> Entry {
assert!(when > 0 && when < u64::MAX);

pub fn new(deadline: Instant) -> Entry {
Entry {
inner: handle.into_inner(),
time: CachePadded::new(UnsafeCell::new(Time {
deadline,
})),
inner: None,
task: AtomicTask::new(),
state: AtomicU64::new(when),
counted: true,
state: AtomicU64::new(0),
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
Expand All @@ -116,34 +122,89 @@ impl Entry {
}
}

pub fn new_elapsed(handle: HandlePriv) -> Entry {
Entry {
inner: handle.into_inner(),
task: AtomicTask::new(),
state: AtomicU64::new(ELAPSED),
counted: true,
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
}
/// Only called by `Registration`
pub fn time_ref(&self) -> &Time {
unsafe { &*self.time.get() }
}

/// Create a new `Entry` that is in the error state. Calling `poll_elapsed` on
/// this `Entry` will always result in `Err` being returned.
pub fn new_error() -> Entry {
Entry {
inner: Weak::new(),
task: AtomicTask::new(),
state: AtomicU64::new(ERROR),
counted: false,
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
/// Only called by `Registration`
pub fn time_mut(&self) -> &mut Time {
unsafe { &mut *self.time.get() }
}

/// Returns `true` if the `Entry` is currently associated with a timer
/// instance.
pub fn is_registered(&self) -> bool {
self.inner.is_some()
}

/// Only called by `Registration`
pub fn register(me: &mut Arc<Self>) {
let handle = match HandlePriv::try_current() {
Ok(handle) => handle,
Err(_) => {
// Could not associate the entry with a timer, transition the
// state to error
Arc::get_mut(me).unwrap()
.transition_to_error();

return;
}
};

Entry::register_with(me, handle)
}

/// Only called by `Registration`
pub fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
assert!(!me.is_registered(), "only register an entry once");

let deadline = me.time_ref().deadline;

let inner = match handle.inner() {
Some(inner) => inner,
None => {
// Could not associate the entry with a timer, transition the
// state to error
Arc::get_mut(me).unwrap()
.transition_to_error();

return;
}
};

// Increment the number of active timeouts
if inner.increment().is_err() {
Arc::get_mut(me).unwrap()
.transition_to_error();

return;
}

// Associate the entry with the timer
Arc::get_mut(me).unwrap()
.inner = Some(handle.into_inner());

let when = inner.normalize_deadline(deadline);

// Relaxed OK: At this point, there are no other threads that have
// access to this entry.
if when <= inner.elapsed() {
me.state.store(ELAPSED, Relaxed);
return;
} else {
me.state.store(when, Relaxed);
}

if inner.queue(me).is_err() {
// The timer has shutdown, transition the entry to the error state.
me.error();
}
}

fn transition_to_error(&mut self) {
self.inner = Some(Weak::new());
self.state = AtomicU64::new(ERROR);
}

/// The current entry state as known by the timer. This is not the value of
Expand Down Expand Up @@ -224,7 +285,8 @@ impl Entry {
return;
}

let inner = match entry.inner.upgrade() {
// If registered with a timer instance, try to upgrade the Arc.
let inner = match entry.upgrade_inner() {
Some(inner) => inner,
None => return,
};
Expand Down Expand Up @@ -260,12 +322,18 @@ impl Entry {
Ok(NotReady)
}

pub fn reset(entry: &Arc<Entry>, deadline: Instant) {
let inner = match entry.inner.upgrade() {
/// Only called by `Registration`
pub fn reset(entry: &mut Arc<Entry>) {
if !entry.is_registered() {
return;
}

let inner = match entry.upgrade_inner() {
Some(inner) => inner,
None => return,
};

let deadline = entry.time_ref().deadline;
let when = inner.normalize_deadline(deadline);
let elapsed = inner.elapsed();

Expand Down Expand Up @@ -305,6 +373,11 @@ impl Entry {
let _ = inner.queue(entry);
}
}

fn upgrade_inner(&self) -> Option<Arc<Inner>> {
self.inner.as_ref()
.and_then(|inner| inner.upgrade())
}
}

fn is_elapsed(state: u64) -> bool {
Expand All @@ -313,11 +386,7 @@ fn is_elapsed(state: u64) -> bool {

impl Drop for Entry {
fn drop(&mut self) {
if !self.counted {
return;
}

let inner = match self.inner.upgrade() {
let inner = match self.upgrade_inner() {
Some(inner) => inner,
None => return,
};
Expand Down
Loading

0 comments on commit cf184eb

Please sign in to comment.