Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement blocking eventfd #3939

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
03fb922
Add test for blocking eventfd read
tiif Oct 4, 2024
0802154
Add eventfd blocking test
tiif Oct 4, 2024
a5cb910
Remove fail-dep test
tiif Oct 4, 2024
e7ffec4
Remove previously fail-dep comment
tiif Oct 4, 2024
3fef8b5
Fix test error
tiif Oct 5, 2024
079cd35
Add blocking support for eventfd
tiif Oct 5, 2024
e0485e4
Add and delete comments
tiif Oct 5, 2024
70356b2
Fix rustfmt?
tiif Oct 5, 2024
88d2ecd
Add test for two threads blocked on eventfd, currently deadlocked
tiif Oct 13, 2024
37328f8
Fix rebase error
tiif Oct 13, 2024
4574a11
Split check counter then block out of eventfd::write
tiif Oct 13, 2024
cffaf49
Split check value and block thread operation out of eventfd::read
tiif Oct 13, 2024
73f5e46
Make blocking on several thread works
tiif Oct 13, 2024
128b68c
Apply suggestion: do refcell borrow and push together
tiif Oct 13, 2024
4bcc55c
Add currently failed test
tiif Oct 13, 2024
9f9813e
Make block then unblock then block again works
tiif Oct 14, 2024
00aea80
Add and remove comments
tiif Oct 14, 2024
69543c5
Bless the test
tiif Oct 14, 2024
213885b
Change name and comment
tiif Oct 14, 2024
18f9e84
Add one more error annotation
tiif Oct 14, 2024
e5a7771
Try to make the test execution clearer by using thread::park
tiif Oct 14, 2024
0d76514
Add test for read block unblock then block behaviour
tiif Oct 14, 2024
a9a244e
Improve tests comment
tiif Oct 14, 2024
42890a9
Failed attempt to get rid of reblock
tiif Oct 14, 2024
d9dc134
Revert "Failed attempt to get rid of reblock"
tiif Oct 14, 2024
38707e4
rustfmt
tiif Oct 14, 2024
733cebc
Remove the reblock guard as it is incorrect
tiif Oct 14, 2024
c4abda9
Fix the BorrowMutError ICE
tiif Oct 17, 2024
41ba16e
Bless the tests
tiif Oct 17, 2024
64d9cfc
Remove todo
tiif Oct 17, 2024
e14d77e
Update comments
tiif Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ pub enum BlockReason {
InitOnce(InitOnceId),
/// Blocked on epoll.
Epoll,
/// Blocked on eventfd.
Eventfd,
}

/// The state of a thread.
Expand Down
255 changes: 207 additions & 48 deletions src/shims/unix/linux/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io;
use std::io::ErrorKind;

use crate::concurrency::VClock;
use crate::shims::unix::fd::FileDescriptionRef;
use crate::shims::unix::fd::{FileDescriptionRef, WeakFileDescriptionRef};
use crate::shims::unix::linux::epoll::{EpollReadyEvents, EvalContextExt as _};
use crate::shims::unix::*;
use crate::*;
Expand All @@ -26,6 +26,10 @@ struct Event {
counter: Cell<u64>,
is_nonblock: bool,
clock: RefCell<VClock>,
/// A list of thread ids blocked on eventfd::read.
blocked_read_tid: RefCell<Vec<ThreadId>>,
/// A list of thread ids blocked on eventfd::write.
blocked_write_tid: RefCell<Vec<ThreadId>>,
}

impl FileDescription for Event {
Expand Down Expand Up @@ -72,31 +76,8 @@ impl FileDescription for Event {
// eventfd read at the size of u64.
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);

// Block when counter == 0.
let counter = self.counter.get();
if counter == 0 {
if self.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}

throw_unsup_format!("eventfd: blocking is unsupported");
} else {
// Synchronize with all prior `write` calls to this FD.
ecx.acquire_clock(&self.clock.borrow());

// Give old counter value to userspace, and set counter value to 0.
ecx.write_int(counter, &buf_place)?;
self.counter.set(0);

// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(self_ref)?;

// Tell userspace how many bytes we wrote.
ecx.write_int(buf_place.layout.size.bytes(), dest)?;
}

interp_ok(())
let weak_eventfd = self_ref.downgrade();
check_read_value_and_block_thread(buf_place, dest, weak_eventfd, ecx)
}

/// A write call adds the 8-byte integer value supplied in
Expand Down Expand Up @@ -127,7 +108,7 @@ impl FileDescription for Event {
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
}

// Read the user supplied value from the pointer.
// Read the user-supplied value from the pointer.
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
let num = ecx.read_scalar(&buf_place)?.to_u64()?;

Expand All @@ -137,27 +118,8 @@ impl FileDescription for Event {
}
// If the addition does not let the counter to exceed the maximum value, update the counter.
// Else, block.
match self.counter.get().checked_add(num) {
Some(new_count @ 0..=MAX_COUNTER) => {
// Future `read` calls will synchronize with this write, so update the FD clock.
ecx.release_clock(|clock| {
self.clock.borrow_mut().join(clock);
});
self.counter.set(new_count);
}
None | Some(u64::MAX) =>
if self.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
throw_unsup_format!("eventfd: blocking is unsupported");
},
};
// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(self_ref)?;

// Return how many bytes we read.
ecx.write_int(buf_place.layout.size.bytes(), dest)
let weak_eventfd = self_ref.downgrade();
check_write_value_and_block_thread(num, buf_place, dest, weak_eventfd, ecx)
}
}

Expand Down Expand Up @@ -217,8 +179,205 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
counter: Cell::new(val.into()),
is_nonblock,
clock: RefCell::new(VClock::default()),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
});

interp_ok(Scalar::from_i32(fd_value))
}
}

/// The function returns the current counter value to the caller, and set the counter value to 0.
fn eventfd_read<'tcx>(
buf_place: &MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref to the function, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

// Synchronize with all prior `write` calls to this FD.
ecx.acquire_clock(&eventfd.clock.borrow());

// Give old counter value to userspace, and set counter value to 0.
let counter = eventfd.counter.get();
// When the counter value is 0, the read either blocks, or fails with error.
assert_ne!(counter, 0);
ecx.write_int(counter, buf_place)?;
eventfd.counter.set(0);

// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(&eventfd_ref)?;

// Unblock *all* threads previously blocked on `write`.
// We need to store the blocked thread ids and unblock them together to prevent BorrowMutError
// panic because when unblock_thread is called, blocked_write_tid will be mutably borrowed again.
let mut waiter = Vec::new();
let mut blocked_write_tid = eventfd.blocked_write_tid.borrow_mut();
while let Some(tid) = blocked_write_tid.pop() {
waiter.push(tid);
}
drop(blocked_write_tid);

waiter.sort();
waiter.dedup();
for thread_id in waiter {
ecx.unblock_thread(thread_id, BlockReason::Eventfd)?;
}
Comment on lines +222 to +233
Copy link
Contributor Author

@tiif tiif Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't think of a testcase that can actually produce thread id duplication in blocked_write/read_id, but I will just keep the dedup here first.


// Tell userspace how many bytes we read.
ecx.write_int(buf_place.layout.size.bytes(), dest)
}

/// Add the user-supplied value to the current counter.
fn eventfd_write<'tcx>(
num: u64,
buf_place: &MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref to the function, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

// Future `read` calls will synchronize with this write, so update the FD clock.
ecx.release_clock(|clock| {
eventfd.clock.borrow_mut().join(clock);
});

// When this function is called, the addition is guaranteed to not exceed u64::MAX - 1.
let new_count = eventfd.counter.get().checked_add(num).unwrap();
eventfd.counter.set(new_count);

// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(&eventfd_ref)?;

// Unblock *all* threads previously blocked on `read`.
// We need to store the blocked thread ids and unblock them together to prevent BorrowMutError
// panic because when unblock_thread is called, blocked_read_tid will be mutably borrowed again.
let mut waiter = Vec::new();
let mut blocked_read_tid = eventfd.blocked_read_tid.borrow_mut();
while let Some(tid) = blocked_read_tid.pop() {
waiter.push(tid);
}
drop(blocked_read_tid);

waiter.sort();
waiter.dedup();
for thread_id in waiter {
ecx.unblock_thread(thread_id, BlockReason::Eventfd)?;
}

// Return how many bytes we wrote.
ecx.write_int(buf_place.layout.size.bytes(), dest)
}

/// Block thread if the value addition will exceed u64::MAX -1,
/// else just add the user-supplied value to current counter.
fn check_write_value_and_block_thread<'tcx>(
num: u64,
buf_place: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

match eventfd.counter.get().checked_add(num) {
Some(_new_count @ 0..=MAX_COUNTER) => {
return eventfd_write(num, &buf_place, dest, weak_eventfd, ecx);
}
None | Some(u64::MAX) => {
if eventfd.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}

let dest = dest.clone();

eventfd.blocked_write_tid.borrow_mut().push(ecx.active_thread());

ecx.block_thread(
BlockReason::Eventfd,
None,
callback!(
@capture<'tcx> {
num: u64,
buf_place: MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
}
@unblock = |this| {
check_write_value_and_block_thread(num, buf_place, &dest, weak_eventfd, this)
}
),
);
}
};
interp_ok(())
}

/// Block thread if the current counter is 0,
/// else just return the current counter value to the caller and set the counter to 0.
fn check_read_value_and_block_thread<'tcx>(
buf_place: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref to the callback function, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

// Block when counter == 0.
let counter = eventfd.counter.get();

if counter == 0 {
if eventfd.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
let dest = dest.clone();

eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread());

ecx.block_thread(
BlockReason::Eventfd,
None,
callback!(
@capture<'tcx> {
buf_place: MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
}
@unblock = |this| {
check_read_value_and_block_thread(buf_place, &dest, weak_eventfd, this)
}
),
);
} else {
eventfd_read(&buf_place, dest, weak_eventfd, ecx)?;
}
interp_ok(())
}
65 changes: 65 additions & 0 deletions tests/fail-dep/libc/eventfd_block_read_twice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//@only-target: linux
//~^ERROR: deadlocked
//~^^ERROR: deadlocked
//@compile-flags: -Zmiri-preemption-rate=0
//@error-in-other-file: deadlock

use std::thread;

// Test the behaviour of a thread being blocked on an eventfd read, get unblocked, and then
// get blocked again.

// The expected execution is
// 1. Thread 1 blocks.
// 2. Thread 2 blocks.
// 3. Thread 3 unblocks both thread 1 and thread 2.
// 4. Either thread 1 or thread 2 reads.
// 5. The next `read` deadlocked.

fn main() {
// eventfd write will block when EFD_NONBLOCK flag is clear
// and the addition caused counter to exceed u64::MAX - 1.
let flags = libc::EFD_CLOEXEC;
let fd = unsafe { libc::eventfd(0, flags) };

let thread1 = thread::spawn(move || {
thread::park();
let mut buf: [u8; 8] = [0; 8];
// This read will block initially.
let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
assert_eq!(res, 8);
let counter = u64::from_ne_bytes(buf);
assert_eq!(counter, 1_u64);
});

let thread2 = thread::spawn(move || {
thread::park();
let mut buf: [u8; 8] = [0; 8];
// This read will block initially, then get unblocked by thread3, then get blocked again
// because the `read` in thread1 executes first and set the counter to 0 again.
let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
//~^ERROR: deadlocked
assert_eq!(res, 8);
let counter = u64::from_ne_bytes(buf);
assert_eq!(counter, 1_u64);
});

let thread3 = thread::spawn(move || {
thread::park();
let sized_8_data = 1_u64.to_ne_bytes();
// Write 1 to the counter, so both thread1 and thread2 will unblock.
let res: i64 = unsafe {
libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap()
};
// Make sure that write is successful.
assert_eq!(res, 8);
});

thread1.thread().unpark();
thread2.thread().unpark();
thread3.thread().unpark();

thread1.join().unwrap();
thread2.join().unwrap();
thread3.join().unwrap();
}
Loading