Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 50 additions & 18 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ defaults:
jobs:
fmt:
uses: smol-rs/.github/.github/workflows/fmt.yml@main
clippy:
uses: smol-rs/.github/.github/workflows/clippy.yml@main
with:
# macOS for kqueue, Windows for windows.
additional-targets: aarch64-apple-darwin x86_64-pc-windows-msvc
security_audit:
uses: smol-rs/.github/.github/workflows/security_audit.yml@main
permissions:
Expand All @@ -48,16 +53,13 @@ jobs:
- name: Install Rust
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
- name: Install cargo-hack and cargo-minimal-versions
uses: taiki-e/install-action@v2
with:
tool: cargo-hack,cargo-minimal-versions
- run: cargo build --all --all-features --all-targets
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
if: startsWith(matrix.rust, 'nightly')
run: cargo check -Z features=dev_dep
- name: Add rust-src
if: startsWith(matrix.rust, 'nightly') && startsWith(matrix.os, 'ubuntu')
run: rustup component add rust-src
- name: Check ESP-IDF
if: startsWith(matrix.rust, 'nightly') && startsWith(matrix.os, 'ubuntu')
run: cargo check -Z build-std --target riscv32imc-esp-espidf
- run: cargo hack build --feature-powerset --no-dev-deps
- run: cargo minimal-versions build --all --all-features
- run: cargo test

# Copied from: https://github.com/rust-lang/stacker/pull/19/files
Expand All @@ -83,12 +85,16 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest]
rust: [nightly, stable]
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- name: Install cross
uses: taiki-e/install-action@cross
- name: Add rust-src
if: startsWith(matrix.rust, 'nightly')
run: rustup component add rust-src
# We don't test BSDs, since we already test them in Cirrus.
- name: Android
if: startsWith(matrix.os, 'ubuntu')
Expand All @@ -113,6 +119,36 @@ jobs:
run: |
rustup target add x86_64-unknown-illumos
cargo build --target x86_64-unknown-illumos
- name: Redox
if: startsWith(matrix.rust, 'nightly') && startsWith(matrix.os, 'ubuntu')
run: |
rustup target add x86_64-unknown-redox
cargo check --target x86_64-unknown-redox
# TODO:
# - name: HermitOS
# if: startsWith(matrix.rust, 'nightly') && startsWith(matrix.os, 'ubuntu')
# run: cargo check -Z build-std --target x86_64-unknown-hermit
# TODO:
# - name: Check haiku
# if: startsWith(matrix.rust, 'nightly') && startsWith(matrix.os, 'ubuntu')
# run: cargo check -Z build-std --target x86_64-unknown-haiku
- name: Check vita
if: startsWith(matrix.rust, 'nightly') && startsWith(matrix.os, 'ubuntu')
run: cargo check -Z build-std --target armv7-sony-vita-newlibeabihf
- name: Check ESP-IDF
if: startsWith(matrix.rust, 'nightly') && startsWith(matrix.os, 'ubuntu')
run: cargo check -Z build-std --target riscv32imc-esp-espidf

wine:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- uses: taiki-e/setup-cross-toolchain-action@v1
with:
target: x86_64-pc-windows-gnu
- run: cargo test --target x86_64-pc-windows-gnu

msrv:
runs-on: ${{ matrix.os }}
Expand All @@ -125,11 +161,7 @@ jobs:
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- run: cargo hack build --no-dev-deps --rust-version

clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- run: cargo clippy --all-features --all-targets
- run: cargo hack build --no-dev-deps --rust-version --target x86_64-unknown-freebsd
if: startsWith(matrix.os, 'ubuntu')
- run: cargo hack build --no-dev-deps --rust-version --target x86_64-unknown-netbsd
if: startsWith(matrix.os, 'ubuntu')
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ concurrent-queue = "2.2.0"
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
futures-lite = { version = "2.0.0", default-features = false }
parking = "2.0.0"
polling = "3.0.0"
polling = "3.4.0"
rustix = { version = "1.0.7", default-features = false, features = ["fs", "net", "std"] }
slab = "0.4.2"
tracing = { version = "0.1.37", default-features = false }
tracing = { version = "0.1.37", default-features = false, optional = true }

[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.59.0", features = ["Win32_Foundation"] }
Expand Down
12 changes: 6 additions & 6 deletions benches/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn read_and_write(b: &mut Criterion) {
(listener, reader, writer)
};

group.bench_function(format!("TcpStream.{}", driver_name), move |b| {
group.bench_function(format!("TcpStream.{driver_name}"), move |b| {
let (_listener, mut reader, mut writer) = init_reader_writer();
let mut buf = vec![0x42; TCP_AMOUNT];

Expand All @@ -55,7 +55,7 @@ fn read_and_write(b: &mut Criterion) {
use std::os::unix::net::UnixStream;
const UNIX_AMOUNT: usize = 1024;

group.bench_function(format!("UnixStream.{}", driver_name), |b| {
group.bench_function(format!("UnixStream.{driver_name}"), |b| {
let (mut reader, mut writer) = Async::<UnixStream>::pair().unwrap();
let mut buf = vec![0x42; UNIX_AMOUNT];

Expand All @@ -79,7 +79,7 @@ fn connect_and_accept(c: &mut Criterion) {

for (driver_name, exec) in [("Undriven", false), ("Driven", true)] {
// Benchmark the TCP streams.
group.bench_function(format!("TcpStream.{}", driver_name), move |b| {
group.bench_function(format!("TcpStream.{driver_name}"), move |b| {
let socket_addr =
SocketAddr::new("127.0.0.1".parse::<Ipv4Addr>().unwrap().into(), 12345);
let listener = Async::<TcpListener>::bind(socket_addr).unwrap();
Expand All @@ -105,10 +105,10 @@ fn connect_and_accept(c: &mut Criterion) {
getrandom::fill(&mut id).unwrap();
let id = u64::from_ne_bytes(id);

let socket_addr = format!("/tmp/async-io-bench-{}.sock", id);
let socket_addr = format!("/tmp/async-io-bench-{id}.sock");
let listener = Async::<UnixListener>::bind(&socket_addr).unwrap();

group.bench_function(format!("UnixStream.{}", driver_name), |b| {
group.bench_function(format!("UnixStream.{driver_name}"), |b| {
b.iter(|| {
block_on(
async {
Expand Down Expand Up @@ -142,7 +142,7 @@ fn udp_send_recv(c: &mut Criterion) {
let mut buf = vec![0x42; UDP_AMOUNT];

for (driver_name, exec) in [("Undriven", false), ("Driven", true)] {
group.bench_function(format!("UdpSocket.{}", driver_name), |b| {
group.bench_function(format!("UdpSocket.{driver_name}"), |b| {
b.iter(|| {
let buf = &mut buf;

Expand Down
2 changes: 1 addition & 1 deletion benches/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn register_timer(c: &mut Criterion) {

// Benchmark registering a timer.
group.bench_function(
format!("register_timer.({} previous timers)", prev_timer_count),
format!("register_timer.({prev_timer_count} previous timers)"),
|b| {
b.iter(|| {
let timer = make_timer();
Expand Down
2 changes: 1 addition & 1 deletion examples/linux-inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn main() -> std::io::Result<()> {
// Wait for events in a loop and print them on the screen.
loop {
for event in unsafe { inotify.read_with_mut(read_op).await? } {
println!("{:?}", event);
println!("{event:?}");
}
}
})
Expand Down
14 changes: 14 additions & 0 deletions src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ pub(crate) fn init() {

/// The main loop for the "async-io" thread.
fn main_loop(parker: parking::Parker) {
#[cfg(feature = "tracing")]
let span = tracing::trace_span!("async_io::main_loop");
#[cfg(feature = "tracing")]
let _enter = span.enter();

// The last observed reactor tick.
Expand All @@ -65,6 +67,7 @@ fn main_loop(parker: parking::Parker) {
};

if let Some(mut reactor_lock) = reactor_lock {
#[cfg(feature = "tracing")]
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();
last_tick = Reactor::get().ticker();
Expand All @@ -80,8 +83,10 @@ fn main_loop(parker: parking::Parker) {
.get(sleeps as usize)
.unwrap_or(&10_000);

#[cfg(feature = "tracing")]
tracing::trace!("sleeping for {} us", delay_us);
if parker.park_timeout(Duration::from_micros(*delay_us)) {
#[cfg(feature = "tracing")]
tracing::trace!("notified");

// If notified before timeout, reset the last tick and the sleep counter.
Expand Down Expand Up @@ -109,7 +114,9 @@ fn main_loop(parker: parking::Parker) {
/// });
/// ```
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
#[cfg(feature = "tracing")]
let span = tracing::trace_span!("async_io::block_on");
#[cfg(feature = "tracing")]
let _enter = span.enter();

// Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
Expand Down Expand Up @@ -200,12 +207,14 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
// Ensure the cached parker is reset to the unnotified state for future block_on calls,
// in case this future called wake and then immediately returned Poll::Ready.
p.park_timeout(Duration::from_secs(0));
#[cfg(feature = "tracing")]
tracing::trace!("completed");
return t;
}

// Check if a notification was received.
if p.park_timeout(Duration::from_secs(0)) {
#[cfg(feature = "tracing")]
tracing::trace!("notified");

// Try grabbing a lock on the reactor to process I/O events.
Expand Down Expand Up @@ -239,22 +248,26 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
// Check if a notification has been received before `io_blocked` was updated
// because in that case the reactor won't receive a wakeup.
if p.park_timeout(Duration::from_secs(0)) {
#[cfg(feature = "tracing")]
tracing::trace!("notified");
break;
}

// Wait for I/O events.
#[cfg(feature = "tracing")]
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();

// Check if a notification has been received.
if p.park_timeout(Duration::from_secs(0)) {
#[cfg(feature = "tracing")]
tracing::trace!("notified");
break;
}

// Check if this thread been handling I/O events for a long time.
if start.elapsed() > Duration::from_micros(500) {
#[cfg(feature = "tracing")]
tracing::trace!("stops hogging the reactor");

// This thread is clearly processing I/O events for some other threads
Expand All @@ -274,6 +287,7 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
}
} else {
// Wait for an actual notification.
#[cfg(feature = "tracing")]
tracing::trace!("sleep until notification");
p.park();
}
Expand Down
10 changes: 10 additions & 0 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ impl Reactor {
///
/// Returns the duration until the next timer before this method was called.
fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
#[cfg(feature = "tracing")]
let span = tracing::trace_span!("process_timers");
#[cfg(feature = "tracing")]
let _enter = span.enter();

let mut timers = self.timers.lock().unwrap();
Expand Down Expand Up @@ -235,6 +237,7 @@ impl Reactor {
drop(timers);

// Add wakers to the list.
#[cfg(feature = "tracing")]
tracing::trace!("{} ready wakers", ready.len());

for (_, waker) in ready {
Expand Down Expand Up @@ -271,7 +274,9 @@ pub(crate) struct ReactorLock<'a> {
impl ReactorLock<'_> {
/// Processes new events, blocking until the first event or the timeout.
pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
#[cfg(feature = "tracing")]
let span = tracing::trace_span!("react");
#[cfg(feature = "tracing")]
let _enter = span.enter();

let mut wakers = Vec::new();
Expand Down Expand Up @@ -353,6 +358,7 @@ impl ReactorLock<'_> {
};

// Wake up ready tasks.
#[cfg(feature = "tracing")]
tracing::trace!("{} ready wakers", wakers.len());
for waker in wakers {
// Don't let a panicking waker blow everything up.
Expand Down Expand Up @@ -518,6 +524,7 @@ impl<T> Future for Readable<'_, T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
#[cfg(feature = "tracing")]
tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
Poll::Ready(Ok(()))
}
Expand All @@ -538,6 +545,7 @@ impl<T> Future for ReadableOwned<T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
#[cfg(feature = "tracing")]
tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
Poll::Ready(Ok(()))
}
Expand All @@ -558,6 +566,7 @@ impl<T> Future for Writable<'_, T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
#[cfg(feature = "tracing")]
tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
Poll::Ready(Ok(()))
}
Expand All @@ -578,6 +587,7 @@ impl<T> Future for WritableOwned<T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
#[cfg(feature = "tracing")]
tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
Poll::Ready(Ok(()))
}
Expand Down
Loading