Skip to content

Commit

Permalink
Store one fiber stack in a Store<T> (#9604)
Browse files Browse the repository at this point in the history
* Store one fiber stack in a `Store<T>`

This commit stores a single fiber stack in `Store<T>` as a cache to be
used throughout the lifetime of the `Store`. This should help amortize
the cost of allocating a stack for use in a store because the same stack
can be used continuously throughout the lifetime of the `Store<T>`. This
notably reduces contention on the lock used to manage the pooling
allocator when possible.

* Fix non-async build
  • Loading branch information
alexcrichton authored Nov 14, 2024
1 parent 0e6c711 commit 7bd09e6
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 13 deletions.
8 changes: 8 additions & 0 deletions crates/fiber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ cfg_if::cfg_if! {
/// Represents an execution stack to use for a fiber.
pub struct FiberStack(imp::FiberStack);

fn _assert_send_sync() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}

_assert_send::<FiberStack>();
_assert_sync::<FiberStack>();
}

impl FiberStack {
/// Creates a new fiber stack of the given size.
pub fn new(size: usize) -> io::Result<Self> {
Expand Down
23 changes: 14 additions & 9 deletions crates/fiber/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@ use std::ops::Range;
use std::ptr;

pub struct FiberStack {
base: *mut u8,
base: BasePtr,
len: usize,

/// Stored here to ensure that when this `FiberStack` the backing storage,
/// if any, is additionally dropped.
storage: FiberStackStorage,
}

struct BasePtr(*mut u8);

unsafe impl Send for BasePtr {}
unsafe impl Sync for BasePtr {}

enum FiberStackStorage {
Mmap(#[allow(dead_code)] MmapFiberStack),
Unmanaged(usize),
Expand All @@ -64,7 +69,7 @@ impl FiberStack {
// region so the base and length of our stack are both offset by a
// single page.
Ok(FiberStack {
base: stack.mapping_base.wrapping_byte_add(page_size),
base: BasePtr(stack.mapping_base.wrapping_byte_add(page_size)),
len: stack.mapping_len - page_size,
storage: FiberStackStorage::Mmap(stack),
})
Expand All @@ -77,7 +82,7 @@ impl FiberStack {
return Self::from_custom(asan::new_fiber_stack(len)?);
}
Ok(FiberStack {
base: base.add(guard_size),
base: BasePtr(base.add(guard_size)),
len,
storage: FiberStackStorage::Unmanaged(guard_size),
})
Expand All @@ -101,28 +106,28 @@ impl FiberStack {
"expected fiber stack end ({end_ptr:?}) to be page aligned ({page_size:#x})",
);
Ok(FiberStack {
base: start_ptr,
base: BasePtr(start_ptr),
len: range.len(),
storage: FiberStackStorage::Custom(custom),
})
}

pub fn top(&self) -> Option<*mut u8> {
Some(self.base.wrapping_byte_add(self.len))
Some(self.base.0.wrapping_byte_add(self.len))
}

pub fn range(&self) -> Option<Range<usize>> {
let base = self.base as usize;
let base = self.base.0 as usize;
Some(base..base + self.len)
}

pub fn guard_range(&self) -> Option<Range<*mut u8>> {
match &self.storage {
FiberStackStorage::Unmanaged(guard_size) => unsafe {
let start = self.base.sub(*guard_size);
Some(start..self.base)
let start = self.base.0.sub(*guard_size);
Some(start..self.base.0)
},
FiberStackStorage::Mmap(mmap) => Some(mmap.mapping_base..self.base),
FiberStackStorage::Mmap(mmap) => Some(mmap.mapping_base..self.base.0),
FiberStackStorage::Custom(custom) => Some(custom.guard_range()),
}
}
Expand Down
49 changes: 45 additions & 4 deletions crates/wasmtime/src/runtime/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ pub struct StoreOpaque {
table_limit: usize,
#[cfg(feature = "async")]
async_state: AsyncState,

// If fuel_yield_interval is enabled, then we store the remaining fuel (that isn't in
// runtime_limits) here. The total amount of fuel is the runtime limits and reserve added
// together. Then when we run out of gas, we inject the yield amount from the reserve
Expand Down Expand Up @@ -392,6 +393,8 @@ pub struct StoreOpaque {
struct AsyncState {
current_suspend: UnsafeCell<*mut wasmtime_fiber::Suspend<Result<()>, (), Result<()>>>,
current_poll_cx: UnsafeCell<PollContext>,
/// The last fiber stack that was in use by this store.
last_fiber_stack: Option<wasmtime_fiber::FiberStack>,
}

#[cfg(feature = "async")]
Expand Down Expand Up @@ -556,6 +559,7 @@ impl<T> Store<T> {
async_state: AsyncState {
current_suspend: UnsafeCell::new(ptr::null_mut()),
current_poll_cx: UnsafeCell::new(PollContext::default()),
last_fiber_stack: None,
},
fuel_reserve: 0,
fuel_yield_interval: None,
Expand Down Expand Up @@ -2099,6 +2103,31 @@ at https://bytecodealliance.org/security.
core::ptr::null_mut()..core::ptr::null_mut()
}
}

#[cfg(feature = "async")]
fn allocate_fiber_stack(&mut self) -> Result<wasmtime_fiber::FiberStack> {
if let Some(stack) = self.async_state.last_fiber_stack.take() {
return Ok(stack);
}
self.engine().allocator().allocate_fiber_stack()
}

#[cfg(feature = "async")]
fn deallocate_fiber_stack(&mut self, stack: wasmtime_fiber::FiberStack) {
self.flush_fiber_stack();
self.async_state.last_fiber_stack = Some(stack);
}

/// Releases the last fiber stack to the underlying instance allocator, if
/// present.
fn flush_fiber_stack(&mut self) {
#[cfg(feature = "async")]
if let Some(stack) = self.async_state.last_fiber_stack.take() {
unsafe {
self.engine.allocator().deallocate_fiber_stack(stack);
}
}
}
}

impl<T> StoreContextMut<'_, T> {
Expand All @@ -2124,13 +2153,14 @@ impl<T> StoreContextMut<'_, T> {
debug_assert!(config.async_stack_size > 0);

let mut slot = None;
let future = {
let mut future = {
let current_poll_cx = self.0.async_state.current_poll_cx.get();
let current_suspend = self.0.async_state.current_suspend.get();
let stack = self.engine().allocator().allocate_fiber_stack()?;
let stack = self.0.allocate_fiber_stack()?;

let engine = self.engine().clone();
let slot = &mut slot;
let this = &mut *self;
let fiber = wasmtime_fiber::Fiber::new(stack, move |keep_going, suspend| {
// First check and see if we were interrupted/dropped, and only
// continue if we haven't been.
Expand All @@ -2148,7 +2178,7 @@ impl<T> StoreContextMut<'_, T> {
let _reset = Reset(current_suspend, *current_suspend);
*current_suspend = suspend;

*slot = Some(func(self));
*slot = Some(func(this));
Ok(())
}
})?;
Expand All @@ -2163,7 +2193,12 @@ impl<T> StoreContextMut<'_, T> {
state: Some(crate::runtime::vm::AsyncWasmCallState::new()),
}
};
future.await?;
(&mut future).await?;
let stack = future.fiber.take().map(|f| f.into_stack());
drop(future);
if let Some(stack) = stack {
self.0.deallocate_fiber_stack(stack);
}

return Ok(slot.unwrap());

Expand Down Expand Up @@ -2373,6 +2408,10 @@ impl<T> StoreContextMut<'_, T> {
// completion.
impl Drop for FiberFuture<'_> {
fn drop(&mut self) {
if self.fiber.is_none() {
return;
}

if !self.fiber().done() {
let result = self.resume(Err(anyhow!("future dropped")));
// This resumption with an error should always complete the
Expand Down Expand Up @@ -2737,6 +2776,8 @@ impl<T: fmt::Debug> fmt::Debug for Store<T> {

impl<T> Drop for Store<T> {
fn drop(&mut self) {
self.inner.flush_fiber_stack();

// for documentation on this `unsafe`, see `into_data`.
unsafe {
ManuallyDrop::drop(&mut self.inner.data);
Expand Down

0 comments on commit 7bd09e6

Please sign in to comment.