Skip to content
Merged
2 changes: 2 additions & 0 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ pub enum BlockReason {
Futex { addr: u64 },
/// Blocked on an InitOnce.
InitOnce(InitOnceId),
/// Blocked on epoll.
Epoll,
}

/// The state of a thread.
Expand Down
8 changes: 8 additions & 0 deletions src/shims/unix/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,14 @@ impl WeakFileDescriptionRef {
}
}

impl VisitProvenance for WeakFileDescriptionRef {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// A weak reference can never be the only reference to some pointer or place.
// Since the actual file description is tracked by strong ref somewhere,
// it is ok to make this a NOP operation.
}
}

/// A unique id for file descriptions. While we could use the address, considering that
/// is definitely unique, the address would expose interpreter internal state when used
/// for sorting things. So instead we generate a unique id per file description that stays
Expand Down
207 changes: 164 additions & 43 deletions src/shims/unix/linux/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::cell::RefCell;
use std::collections::BTreeMap;
use std::io;
use std::rc::{Rc, Weak};
use std::time::Duration;

use crate::shims::unix::fd::{FdId, FileDescriptionRef};
use crate::shims::unix::fd::{FdId, FileDescriptionRef, WeakFileDescriptionRef};
use crate::shims::unix::*;
use crate::*;

Expand All @@ -19,6 +20,8 @@ struct Epoll {
// This is an Rc because EpollInterest need to hold a reference to update
// it.
ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
/// A list of thread ids blocked on this epoll instance.
thread_id: RefCell<Vec<ThreadId>>,
}

/// EpollEventInstance contains information that will be returned by epoll_wait.
Expand Down Expand Up @@ -58,6 +61,8 @@ pub struct EpollEventInterest {
data: u64,
/// Ready list of the epoll instance under which this EpollEventInterest is registered.
ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
/// The file descriptor value that this EpollEventInterest is registered under.
epfd: i32,
}

/// EpollReadyEvents reflects the readiness of a file description.
Expand Down Expand Up @@ -338,6 +343,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
events,
data,
ready_list: Rc::clone(ready_list),
epfd: epfd_value,
}));

if op == epoll_ctl_add {
Expand Down Expand Up @@ -395,7 +401,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {

/// The `timeout` argument specifies the number of milliseconds that
/// `epoll_wait()` will block. Time is measured against the
/// CLOCK_MONOTONIC clock.
/// CLOCK_MONOTONIC clock. If the timeout is zero, the function will not block,
/// while if the timeout is -1, the function will block
/// until at least one event has been retrieved (or an error
/// occurred).

/// A call to `epoll_wait()` will block until either:
/// • a file descriptor delivers an event;
Expand All @@ -421,59 +430,100 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
events_op: &OpTy<'tcx>,
maxevents: &OpTy<'tcx>,
timeout: &OpTy<'tcx>,
) -> InterpResult<'tcx, Scalar> {
dest: &MPlaceTy<'tcx>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();

let epfd = this.read_scalar(epfd)?.to_i32()?;
let epfd_value = this.read_scalar(epfd)?.to_i32()?;
let events = this.read_immediate(events_op)?;
let maxevents = this.read_scalar(maxevents)?.to_i32()?;
let timeout = this.read_scalar(timeout)?.to_i32()?;

if epfd <= 0 || maxevents <= 0 {
if epfd_value <= 0 || maxevents <= 0 {
let einval = this.eval_libc("EINVAL");
this.set_last_error(einval)?;
return Ok(Scalar::from_i32(-1));
this.write_int(-1, dest)?;
return Ok(());
}

// This needs to come after the maxevents value check, or else maxevents.try_into().unwrap()
// will fail.
let events = this.deref_pointer_as(
let event = this.deref_pointer_as(
&events,
this.libc_array_ty_layout("epoll_event", maxevents.try_into().unwrap()),
)?;

// FIXME: Implement blocking support
if timeout != 0 {
throw_unsup_format!("epoll_wait: timeout value can only be 0");
}

let Some(epfd) = this.machine.fds.get(epfd) else {
return Ok(Scalar::from_i32(this.fd_not_found()?));
let Some(epfd) = this.machine.fds.get(epfd_value) else {
let result_value: i32 = this.fd_not_found()?;
this.write_int(result_value, dest)?;
return Ok(());
};
let epoll_file_description = epfd
.downcast::<Epoll>()
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;

let ready_list = epoll_file_description.get_ready_list();
let mut ready_list = ready_list.borrow_mut();
let mut num_of_events: i32 = 0;
let mut array_iter = this.project_array_fields(&events)?;

while let Some(des) = array_iter.next(this)? {
if let Some(epoll_event_instance) = ready_list_next(this, &mut ready_list) {
this.write_int_fields_named(
&[
("events", epoll_event_instance.events.into()),
("u64", epoll_event_instance.data.into()),
],
&des.1,
)?;
num_of_events = num_of_events.strict_add(1);
} else {
break;
}
// Create a weak ref of epfd and pass it to callback so we will make sure that epfd
// is not close after the thread unblocks.
let weak_epfd = epfd.downgrade();

// We just need to know if the ready list is empty and borrow the thread_ids out.
// The whole logic is wrapped inside a block so we don't need to manually drop epfd later.
let ready_list_empty;
let mut thread_ids;
{
let epoll_file_description = epfd
.downcast::<Epoll>()
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;
let binding = epoll_file_description.get_ready_list();
ready_list_empty = binding.borrow_mut().is_empty();
thread_ids = epoll_file_description.thread_id.borrow_mut();
}
Ok(Scalar::from_i32(num_of_events))
if timeout == 0 || !ready_list_empty {
// If the ready list is not empty, or the timeout is 0, we can return immediately.
blocking_epoll_callback(epfd_value, weak_epfd, dest, &event, this)?;
} else {
// Blocking
let timeout = match timeout {
0.. => {
let duration = Duration::from_millis(timeout.try_into().unwrap());
Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration))
}
-1 => None,
..-1 => {
throw_unsup_format!(
"epoll_wait: Only timeout values greater than or equal to -1 are supported."
);
}
};
thread_ids.push(this.active_thread());
let dest = dest.clone();
this.block_thread(
BlockReason::Epoll,
timeout,
callback!(
@capture<'tcx> {
epfd_value: i32,
weak_epfd: WeakFileDescriptionRef,
dest: MPlaceTy<'tcx>,
event: MPlaceTy<'tcx>,
}
@unblock = |this| {
blocking_epoll_callback(epfd_value, weak_epfd, &dest, &event, this)?;
Ok(())
}
@timeout = |this| {
// No notification after blocking timeout.
let Some(epfd) = weak_epfd.upgrade() else {
throw_unsup_format!("epoll FD {epfd_value} got closed while blocking.")
};
// Remove the current active thread_id from the blocked thread_id list.
epfd.downcast::<Epoll>()
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?
.thread_id.borrow_mut()
.retain(|&id| id != this.active_thread());
this.write_int(0, &dest)?;
Ok(())
}
),
);
}
Ok(())
}

/// For a specific file description, get its ready events and update the corresponding ready
Expand All @@ -483,17 +533,47 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
///
/// This *will* report an event if anyone is subscribed to it, without any further filtering, so
/// do not call this function when an FD didn't have anything happen to it!
fn check_and_update_readiness(&self, fd_ref: &FileDescriptionRef) -> InterpResult<'tcx, ()> {
let this = self.eval_context_ref();
fn check_and_update_readiness(
&mut self,
fd_ref: &FileDescriptionRef,
) -> InterpResult<'tcx, ()> {
let this = self.eval_context_mut();
let id = fd_ref.get_id();
let mut waiter = Vec::new();
// Get a list of EpollEventInterest that is associated to a specific file description.
if let Some(epoll_interests) = this.machine.epoll_interests.get_epoll_interest(id) {
for weak_epoll_interest in epoll_interests {
if let Some(epoll_interest) = weak_epoll_interest.upgrade() {
check_and_update_one_event_interest(fd_ref, epoll_interest, id, this)?;
let is_updated = check_and_update_one_event_interest(
fd_ref,
epoll_interest.clone(),
id,
this,
)?;
if is_updated {
// Edge-triggered notification only notify one thread even if there are
// multiple threads block on the same epfd.
let epfd = this.machine.fds.get(epoll_interest.borrow().epfd).unwrap();

// This unwrap can never fail because if the current epoll instance were
// closed and its epfd value reused, the upgrade of weak_epoll_interest
// above would fail. This guarantee holds because only the epoll instance
// holds a strong ref to epoll_interest.
// FIXME: We can randomly pick a thread to unblock.
if let Some(thread_id) =
epfd.downcast::<Epoll>().unwrap().thread_id.borrow_mut().pop()
{
waiter.push(thread_id);
};
}
}
}
}
waiter.sort();
waiter.dedup();
for thread_id in waiter {
this.unblock_thread(thread_id, BlockReason::Epoll)?;
}
Ok(())
}
}
Expand All @@ -517,14 +597,15 @@ fn ready_list_next(
}

/// This helper function checks whether an epoll notification should be triggered for a specific
/// epoll_interest and, if necessary, triggers the notification. Unlike check_and_update_readiness,
/// this function sends a notification to only one epoll instance.
/// epoll_interest and, if necessary, triggers the notification, and returns whether the
/// notification was added/updated. Unlike check_and_update_readiness, this function sends a
/// notification to only one epoll instance.
fn check_and_update_one_event_interest<'tcx>(
fd_ref: &FileDescriptionRef,
interest: Rc<RefCell<EpollEventInterest>>,
id: FdId,
ecx: &MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
) -> InterpResult<'tcx, bool> {
// Get the bitmask of ready events for a file description.
let ready_events_bitmask = fd_ref.get_epoll_ready_events()?.get_event_bitmask(ecx);
let epoll_event_interest = interest.borrow();
Expand All @@ -539,6 +620,46 @@ fn check_and_update_one_event_interest<'tcx>(
let event_instance = EpollEventInstance::new(flags, epoll_event_interest.data);
// Triggers the notification by inserting it to the ready list.
ready_list.insert(epoll_key, event_instance);
return Ok(true);
}
return Ok(false);
}

/// Callback function after epoll_wait unblocks
fn blocking_epoll_callback<'tcx>(
epfd_value: i32,
weak_epfd: WeakFileDescriptionRef,
dest: &MPlaceTy<'tcx>,
events: &MPlaceTy<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(epfd) = weak_epfd.upgrade() else {
throw_unsup_format!("epoll FD {epfd_value} got closed while blocking.")
};

let epoll_file_description = epfd
.downcast::<Epoll>()
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;

let ready_list = epoll_file_description.get_ready_list();
let mut ready_list = ready_list.borrow_mut();
let mut num_of_events: i32 = 0;
let mut array_iter = ecx.project_array_fields(events)?;

while let Some(des) = array_iter.next(ecx)? {
if let Some(epoll_event_instance) = ready_list_next(ecx, &mut ready_list) {
ecx.write_int_fields_named(
&[
("events", epoll_event_instance.events.into()),
("u64", epoll_event_instance.data.into()),
],
&des.1,
)?;
num_of_events = num_of_events.strict_add(1);
} else {
break;
}
}
ecx.write_int(num_of_events, dest)?;
Ok(())
}
3 changes: 1 addition & 2 deletions src/shims/unix/linux/foreign_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
"epoll_wait" => {
let [epfd, events, maxevents, timeout] =
this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
let result = this.epoll_wait(epfd, events, maxevents, timeout)?;
this.write_scalar(result, dest)?;
this.epoll_wait(epfd, events, maxevents, timeout, dest)?;
}
"eventfd" => {
let [val, flag] =
Expand Down
15 changes: 0 additions & 15 deletions tests/fail-dep/tokio/sleep.stderr

This file was deleted.

Loading