Skip to content

Commit

Permalink
feat: Replace log with tracing (#140)
Browse files Browse the repository at this point in the history
Replaces the `log` crate with `tracing`. See this comment for more info:
smol-rs/blocking#31 (comment)
  • Loading branch information
notgull authored Jun 14, 2023
1 parent 6ec079e commit a1f4c0a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ cfg-if = "1"
concurrent-queue = "2.2.0"
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
futures-lite = { version = "1.11.0", default-features = false }
log = "0.4.11"
parking = "2.0.0"
polling = "2.6.0"
rustix = { version = "0.37.1", default-features = false, features = ["std", "fs"] }
slab = "0.4.2"
socket2 = { version = "0.5.3", features = ["all"] }
tracing = { version = "0.1.37", default-features = false }
waker-fn = "1.1.0"

[build-dependencies]
Expand Down
26 changes: 15 additions & 11 deletions src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub(crate) fn init() {

/// The main loop for the "async-io" thread.
fn main_loop(parker: parking::Parker) {
let span = tracing::trace_span!("async_io::main_loop");
let _enter = span.enter();

// The last observed reactor tick.
let mut last_tick = 0;
// Number of sleeps since this thread has called `react()`.
Expand All @@ -61,7 +64,7 @@ fn main_loop(parker: parking::Parker) {
};

if let Some(mut reactor_lock) = reactor_lock {
log::trace!("main_loop: waiting on I/O");
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();
last_tick = Reactor::get().ticker();
sleeps = 0;
Expand All @@ -76,9 +79,9 @@ fn main_loop(parker: parking::Parker) {
.get(sleeps as usize)
.unwrap_or(&10_000);

log::trace!("main_loop: sleeping for {} us", delay_us);
tracing::trace!("sleeping for {} us", delay_us);
if parker.park_timeout(Duration::from_micros(*delay_us)) {
log::trace!("main_loop: notified");
tracing::trace!("notified");

// If notified before timeout, reset the last tick and the sleep counter.
last_tick = Reactor::get().ticker();
Expand All @@ -105,7 +108,8 @@ fn main_loop(parker: parking::Parker) {
/// });
/// ```
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
log::trace!("block_on()");
let span = tracing::trace_span!("async_io::block_on");
let _enter = span.enter();

// Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
Expand Down Expand Up @@ -144,13 +148,13 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
loop {
// Poll the future.
if let Poll::Ready(t) = future.as_mut().poll(cx) {
log::trace!("block_on: completed");
tracing::trace!("completed");
return t;
}

// Check if a notification was received.
if p.park_timeout(Duration::from_secs(0)) {
log::trace!("block_on: notified");
tracing::trace!("notified");

// Try grabbing a lock on the reactor to process I/O events.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
Expand Down Expand Up @@ -183,23 +187,23 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
// Check if a notification has been received before `io_blocked` was updated
// because in that case the reactor won't receive a wakeup.
if p.park_timeout(Duration::from_secs(0)) {
log::trace!("block_on: notified");
tracing::trace!("notified");
break;
}

// Wait for I/O events.
log::trace!("block_on: waiting on I/O");
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();

// Check if a notification has been received.
if p.park_timeout(Duration::from_secs(0)) {
log::trace!("block_on: notified");
tracing::trace!("notified");
break;
}

// Check if this thread been handling I/O events for a long time.
if start.elapsed() > Duration::from_micros(500) {
log::trace!("block_on: stops hogging the reactor");
tracing::trace!("stops hogging the reactor");

// This thread is clearly processing I/O events for some other threads
// because it didn't get a notification yet. It's best to stop hogging the
Expand All @@ -218,7 +222,7 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
}
} else {
// Wait for an actual notification.
log::trace!("block_on: sleep until notification");
tracing::trace!("sleep until notification");
p.park();
}
}
Expand Down
19 changes: 13 additions & 6 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ impl Reactor {
///
/// Returns the duration until the next timer before this method was called.
fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
let span = tracing::trace_span!("process_timers");
let _enter = span.enter();

let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);

Expand Down Expand Up @@ -227,7 +230,8 @@ impl Reactor {
drop(timers);

// Add wakers to the list.
log::trace!("process_timers: {} ready wakers", ready.len());
tracing::trace!("{} ready wakers", ready.len());

for (_, waker) in ready {
wakers.push(waker);
}
Expand Down Expand Up @@ -262,6 +266,9 @@ pub(crate) struct ReactorLock<'a> {
impl ReactorLock<'_> {
/// Processes new events, blocking until the first event or the timeout.
pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
let span = tracing::trace_span!("react");
let _enter = span.enter();

let mut wakers = Vec::new();

// Process ready timers.
Expand Down Expand Up @@ -339,7 +346,7 @@ impl ReactorLock<'_> {
};

// Wake up ready tasks.
log::trace!("react: {} ready wakers", wakers.len());
tracing::trace!("{} ready wakers", wakers.len());
for waker in wakers {
// Don't let a panicking waker blow everything up.
panic::catch_unwind(|| waker.wake()).ok();
Expand Down Expand Up @@ -502,7 +509,7 @@ impl<T> Future for Readable<'_, T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
log::trace!("readable: fd={:?}", self.0.handle.source.registration);
tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
Poll::Ready(Ok(()))
}
}
Expand All @@ -522,7 +529,7 @@ impl<T> Future for ReadableOwned<T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
log::trace!("readable_owned: fd={:?}", self.0.handle.source.registration);
tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
Poll::Ready(Ok(()))
}
}
Expand All @@ -542,7 +549,7 @@ impl<T> Future for Writable<'_, T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
log::trace!("writable: fd={:?}", self.0.handle.source.registration);
tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
Poll::Ready(Ok(()))
}
}
Expand All @@ -562,7 +569,7 @@ impl<T> Future for WritableOwned<T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
log::trace!("writable_owned: fd={:?}", self.0.handle.source.registration);
tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
Poll::Ready(Ok(()))
}
}
Expand Down

0 comments on commit a1f4c0a

Please sign in to comment.