Skip to content

Commit

Permalink
io: update to Mio 0.7 (#2893)
Browse files Browse the repository at this point in the history
This also makes Mio an implementation detail, removing it from the
public API.

This is based on #1767.
  • Loading branch information
carllerche authored Oct 2, 2020
1 parent 7ec6d88 commit 1e585cc
Show file tree
Hide file tree
Showing 19 changed files with 476 additions and 588 deletions.
22 changes: 9 additions & 13 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ net = ["dns", "tcp", "udp", "uds"]
process = [
"lazy_static",
"libc",
"mio",
"mio-named-pipes",
"mio-uds",
"mio/os-poll",
"mio/os-util",
"mio/uds",
"signal-hook-registry",
"winapi/threadpoollegacyapiset",
]
Expand All @@ -74,18 +74,18 @@ rt-threaded = [
signal = [
"lazy_static",
"libc",
"mio",
"mio-uds",
"mio/os-poll",
"mio/uds",
"signal-hook-registry",
"winapi/consoleapi",
]
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["lazy_static", "mio"]
tcp = ["lazy_static", "mio/tcp", "mio/os-poll"]
time = ["slab"]
udp = ["lazy_static", "mio"]
uds = ["lazy_static", "libc", "mio", "mio-uds"]
udp = ["lazy_static", "mio/udp", "mio/os-poll"]
uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll"]

[dependencies]
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }
Expand All @@ -98,20 +98,16 @@ fnv = { version = "1.0.6", optional = true }
futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.20", optional = true }
mio = { version = "0.7.2", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true } # Not in full
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full

[target.'cfg(unix)'.dependencies]
mio-uds = { version = "0.6.5", optional = true }
libc = { version = "0.2.42", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(windows)'.dependencies]
mio-named-pipes = { version = "0.1.6", optional = true }

[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.8"
default-features = false
Expand Down
83 changes: 35 additions & 48 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod platform;
mod ready;
use ready::Ready;

mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
Expand All @@ -8,7 +9,6 @@ use crate::runtime::context;
use crate::util::bit;
use crate::util::slab::{self, Slab};

use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
Expand All @@ -27,10 +27,11 @@ pub(crate) struct Driver {
/// with this driver.
resources: Slab<ScheduledIo>,

/// The system event queue
poll: mio::Poll,

/// State shared between the reactor and the handles.
inner: Arc<Inner>,

_wakeup_registration: mio::Registration,
}

/// A reference to an I/O driver
Expand All @@ -41,18 +42,18 @@ pub(crate) struct Handle {

pub(crate) struct ReadyEvent {
tick: u8,
readiness: mio::Ready,
ready: Ready,
}

pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
/// Registers I/O resources
registry: mio::Registry,

/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,

/// Used to wake up the reactor from a call to `turn`
wakeup: mio::SetReadiness,
waker: mio::Waker,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -92,27 +93,22 @@ impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
let io = mio::Poll::new()?;
let wakeup_pair = mio::Registration::new2();
let poll = mio::Poll::new()?;
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
let registry = poll.registry().try_clone()?;

let slab = Slab::new();
let allocator = slab.allocator();

io.register(
&wakeup_pair.0,
TOKEN_WAKEUP,
mio::Ready::readable(),
mio::PollOpt::level(),
)?;

Ok(Driver {
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
resources: slab,
_wakeup_registration: wakeup_pair.0,
poll,
inner: Arc::new(Inner {
io,
registry,
io_dispatch: allocator,
wakeup: wakeup_pair.1,
waker,
}),
})
}
Expand Down Expand Up @@ -143,23 +139,18 @@ impl Driver {

// Block waiting for an event to happen, peeling out how many events
// happened.
match self.inner.io.poll(&mut events, max_wait) {
match self.poll.poll(&mut events, max_wait) {
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}

// Process all the events that came in, dispatching appropriately

for event in events.iter() {
let token = event.token();

if token == TOKEN_WAKEUP {
self.inner
.wakeup
.set_readiness(mio::Ready::empty())
.unwrap();
} else {
self.dispatch(token, event.readiness());
if token != TOKEN_WAKEUP {
self.dispatch(token, Ready::from_mio(event));
}
}

Expand All @@ -168,18 +159,17 @@ impl Driver {
Ok(())
}

fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let io = match self.resources.get(addr) {
Some(io) => io,
None => return,
};

let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| {
curr | ready.as_usize()
});
if set.is_err() {
let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready);

if res.is_err() {
// token no longer valid!
return;
}
Expand All @@ -194,7 +184,7 @@ impl Drop for Driver {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.wake(mio::Ready::all());
io.wake(Ready::ALL);
})
}
}
Expand Down Expand Up @@ -250,7 +240,7 @@ impl Handle {
/// return immediately.
fn wakeup(&self) {
if let Some(inner) = self.inner() {
inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
inner.waker.wake().expect("failed to wake I/O driver");
}
}

Expand Down Expand Up @@ -279,8 +269,8 @@ impl Inner {
/// The registration token is returned.
pub(super) fn add_source(
&self,
source: &dyn Evented,
ready: mio::Ready,
source: &mut impl mio::event::Source,
interest: mio::Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
Expand All @@ -291,26 +281,23 @@ impl Inner {

let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));

self.io
.register(source, mio::Token(token), ready, mio::PollOpt::edge())?;
self.registry
.register(source, mio::Token(token), interest)?;

Ok(shared)
}

/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
self.registry.deregister(source)
}
}

impl Direction {
pub(super) fn mask(self) -> mio::Ready {
pub(super) fn mask(self) -> Ready {
match self {
Direction::Read => {
// Everything except writable is signaled through read.
mio::Ready::all() - mio::Ready::writable()
}
Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(),
Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
}
}
}
Loading

0 comments on commit 1e585cc

Please sign in to comment.