Skip to content

Commit

Permalink
Rollup merge of #97140 - joboet:solid_parker, r=m-ou-se
Browse files Browse the repository at this point in the history
std: use an event-flag-based thread parker on SOLID

`Mutex` and `Condvar` are being replaced by more efficient implementations, which need thread parking themselves (see #93740). Therefore, the generic `Parker` needs to be replaced on all platforms where the new lock implementation will be used, which, after #96393, are SOLID, SGX and Hermit (more PRs coming soon).

SOLID, conforming to the [μITRON specification](http://www.ertl.jp/ITRON/SPEC/FILE/mitron-400e.pdf), has event flags, which are a thread parking primitive very similar to `Parker`. However, they do not make any atomic ordering guarantees (even though those can probably be assumed) and necessitate a system call even when the thread token is already available. Hence, this `Parker`, like the Windows parker, uses an extra atomic state variable.

I future-proofed the code by wrapping the event flag in a `WaitFlag` structure, as both SGX and Hermit can share the Parker implementation, they just have slightly different primitives (SGX uses signals and Hermit has a thread blocking API).

`````@kawadakk````` I assume you are the target maintainer? Could you test this for me?
  • Loading branch information
matthiaskrgr authored Jun 26, 2022
2 parents 788dded + caff723 commit c348bea
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 6 deletions.
48 changes: 45 additions & 3 deletions library/std/src/sys/itron/abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,32 @@ pub type ER = int_t;
/// Error code type, `ID` on success
pub type ER_ID = int_t;

/// Service call operational mode
pub type MODE = uint_t;

/// OR waiting condition for an eventflag
pub const TWF_ORW: MODE = 0x01;

/// Object attributes
pub type ATR = uint_t;

/// FIFO wait order
pub const TA_FIFO: ATR = 0;
/// Only one task is allowed to be in the waiting state for the eventflag
pub const TA_WSGL: ATR = 0;
/// The eventflag’s bit pattern is cleared when a task is released from the
/// waiting state for that eventflag.
pub const TA_CLR: ATR = 0x04;

/// Bit pattern of an eventflag
pub type FLGPTN = uint_t;

/// Task or interrupt priority
pub type PRI = int_t;

/// The special value of `PRI` representing the current task's priority.
pub const TPRI_SELF: PRI = 0;

/// Object attributes
pub type ATR = uint_t;

/// Use the priority inheritance protocol
#[cfg(target_os = "solid_asp3")]
pub const TA_INHERIT: ATR = 0x02;
Expand Down Expand Up @@ -90,6 +107,13 @@ pub struct T_CSEM {
pub maxsem: uint_t,
}

#[derive(Clone, Copy)]
#[repr(C)]
pub struct T_CFLG {
pub flgatr: ATR,
pub iflgptn: FLGPTN,
}

#[derive(Clone, Copy)]
#[repr(C)]
pub struct T_CMTX {
Expand Down Expand Up @@ -139,6 +163,24 @@ extern "C" {
pub fn sns_dsp() -> bool_t;
#[link_name = "__asp3_get_tim"]
pub fn get_tim(p_systim: *mut SYSTIM) -> ER;
#[link_name = "__asp3_acre_flg"]
pub fn acre_flg(pk_cflg: *const T_CFLG) -> ER_ID;
#[link_name = "__asp3_del_flg"]
pub fn del_flg(flgid: ID) -> ER;
#[link_name = "__asp3_set_flg"]
pub fn set_flg(flgid: ID, setptn: FLGPTN) -> ER;
#[link_name = "__asp3_clr_flg"]
pub fn clr_flg(flgid: ID, clrptn: FLGPTN) -> ER;
#[link_name = "__asp3_wai_flg"]
pub fn wai_flg(flgid: ID, waiptn: FLGPTN, wfmode: MODE, p_flgptn: *mut FLGPTN) -> ER;
#[link_name = "__asp3_twai_flg"]
pub fn twai_flg(
flgid: ID,
waiptn: FLGPTN,
wfmode: MODE,
p_flgptn: *mut FLGPTN,
tmout: TMO,
) -> ER;
#[link_name = "__asp3_acre_mtx"]
pub fn acre_mtx(pk_cmtx: *const T_CMTX) -> ER_ID;
#[link_name = "__asp3_del_mtx"]
Expand Down
72 changes: 72 additions & 0 deletions library/std/src/sys/itron/wait_flag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crate::mem::MaybeUninit;
use crate::time::Duration;

use super::{
abi,
error::{expect_success, fail},
time::with_tmos,
};

const CLEAR: abi::FLGPTN = 0;
const RAISED: abi::FLGPTN = 1;

/// A thread parking primitive that is not susceptible to race conditions,
/// but provides no atomic ordering guarantees and allows only one `raise` per wait.
pub struct WaitFlag {
flag: abi::ID,
}

impl WaitFlag {
/// Creates a new wait flag.
pub fn new() -> WaitFlag {
let flag = expect_success(
unsafe {
abi::acre_flg(&abi::T_CFLG {
flgatr: abi::TA_FIFO | abi::TA_WSGL | abi::TA_CLR,
iflgptn: CLEAR,
})
},
&"acre_flg",
);

WaitFlag { flag }
}

/// Wait for the wait flag to be raised.
pub fn wait(&self) {
let mut token = MaybeUninit::uninit();
expect_success(
unsafe { abi::wai_flg(self.flag, RAISED, abi::TWF_ORW, token.as_mut_ptr()) },
&"wai_flg",
);
}

/// Wait for the wait flag to be raised or the timeout to occur.
///
/// Returns whether the flag was raised (`true`) or the operation timed out (`false`).
pub fn wait_timeout(&self, dur: Duration) -> bool {
let mut token = MaybeUninit::uninit();
let res = with_tmos(dur, |tmout| unsafe {
abi::twai_flg(self.flag, RAISED, abi::TWF_ORW, token.as_mut_ptr(), tmout)
});

match res {
abi::E_OK => true,
abi::E_TMOUT => false,
error => fail(error, &"twai_flg"),
}
}

/// Raise the wait flag.
///
/// Calls to this function should be balanced with the number of successful waits.
pub fn raise(&self) {
expect_success(unsafe { abi::set_flg(self.flag, RAISED) }, &"set_flg");
}
}

impl Drop for WaitFlag {
fn drop(&mut self) {
expect_success(unsafe { abi::del_flg(self.flag) }, &"del_flg");
}
}
2 changes: 2 additions & 0 deletions library/std/src/sys/solid/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod itron {
pub mod thread;
pub(super) mod time;
use super::unsupported;
pub mod wait_flag;
}

pub mod alloc;
Expand Down Expand Up @@ -43,6 +44,7 @@ pub mod memchr;
pub mod thread_local_dtor;
pub mod thread_local_key;
pub mod time;
pub use self::itron::wait_flag;

mod rwlock;

Expand Down
7 changes: 4 additions & 3 deletions library/std/src/sys_common/thread_parker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ cfg_if::cfg_if! {
))] {
mod futex;
pub use futex::Parker;
} else if #[cfg(windows)] {
pub use crate::sys::thread_parker::Parker;
} else if #[cfg(target_family = "unix")] {
} else if #[cfg(target_os = "solid_asp3")] {
mod wait_flag;
pub use wait_flag::Parker;
} else if #[cfg(any(windows, target_family = "unix"))] {
pub use crate::sys::thread_parker::Parker;
} else {
mod generic;
Expand Down
102 changes: 102 additions & 0 deletions library/std/src/sys_common/thread_parker/wait_flag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//! A wait-flag-based thread parker.
//!
//! Some operating systems provide low-level parking primitives like wait counts,
//! event flags or semaphores which are not susceptible to race conditions (meaning
//! the wakeup can occur before the wait operation). To implement the `std` thread
//! parker on top of these primitives, we only have to ensure that parking is fast
//! when the thread token is available, the atomic ordering guarantees are maintained
//! and spurious wakeups are minimized.
//!
//! To achieve this, this parker uses an atomic variable with three states: `EMPTY`,
//! `PARKED` and `NOTIFIED`:
//! * `EMPTY` means the token has not been made available, but the thread is not
//! currently waiting on it.
//! * `PARKED` means the token is not available and the thread is parked.
//! * `NOTIFIED` means the token is available.
//!
//! `park` and `park_timeout` change the state from `EMPTY` to `PARKED` and from
//! `NOTIFIED` to `EMPTY`. If the state was `NOTIFIED`, the thread was unparked and
//! execution can continue without calling into the OS. If the state was `EMPTY`,
//! the token is not available and the thread waits on the primitive (here called
//! "wait flag").
//!
//! `unpark` changes the state to `NOTIFIED`. If the state was `PARKED`, the thread
//! is or will be sleeping on the wait flag, so we raise it.

use crate::pin::Pin;
use crate::sync::atomic::AtomicI8;
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sys::wait_flag::WaitFlag;
use crate::time::Duration;

const EMPTY: i8 = 0;
const PARKED: i8 = -1;
const NOTIFIED: i8 = 1;

pub struct Parker {
state: AtomicI8,
wait_flag: WaitFlag,
}

impl Parker {
/// Construct a parker for the current thread. The UNIX parker
/// implementation requires this to happen in-place.
pub unsafe fn new(parker: *mut Parker) {
parker.write(Parker { state: AtomicI8::new(EMPTY), wait_flag: WaitFlag::new() })
}

// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
pub unsafe fn park(self: Pin<&Self>) {
match self.state.fetch_sub(1, Acquire) {
// NOTIFIED => EMPTY
NOTIFIED => return,
// EMPTY => PARKED
EMPTY => (),
_ => panic!("inconsistent park state"),
}

// Avoid waking up from spurious wakeups (these are quite likely, see below).
loop {
self.wait_flag.wait();

match self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed) {
Ok(_) => return,
Err(PARKED) => (),
Err(_) => panic!("inconsistent park state"),
}
}
}

// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
match self.state.fetch_sub(1, Acquire) {
NOTIFIED => return,
EMPTY => (),
_ => panic!("inconsistent park state"),
}

self.wait_flag.wait_timeout(dur);

// Either a wakeup or a timeout occurred. Wakeups may be spurious, as there can be
// a race condition when `unpark` is performed between receiving the timeout and
// resetting the state, resulting in the eventflag being set unnecessarily. `park`
// is protected against this by looping until the token is actually given, but
// here we cannot easily tell.

// Use `swap` to provide acquire ordering.
match self.state.swap(EMPTY, Acquire) {
NOTIFIED => (),
PARKED => (),
_ => panic!("inconsistent park state"),
}
}

// This implementation doesn't require `Pin`, but other implementations do.
pub fn unpark(self: Pin<&Self>) {
let state = self.state.swap(NOTIFIED, Release);

if state == PARKED {
self.wait_flag.raise();
}
}
}

0 comments on commit c348bea

Please sign in to comment.