Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
start_time: Instant::now(),
session_id: backing_storage.next_session_id(),
persisted_task_id_factory: IdFactoryWithReuse::new(
*backing_storage.next_free_task_id() as u64,
(TRANSIENT_TASK_BIT - 1) as u64,
backing_storage.next_free_task_id(),
TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
),
transient_task_id_factory: IdFactoryWithReuse::new(
TRANSIENT_TASK_BIT as u64,
u32::MAX as u64,
TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
TaskId::MAX,
),
persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
task_cache: BiMap::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,13 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
}

fn next_free_task_id(&self) -> TaskId {
TaskId::from(get_infra_u32(&self.database, META_KEY_NEXT_FREE_TASK_ID).unwrap_or(1))
TaskId::try_from(get_infra_u32(&self.database, META_KEY_NEXT_FREE_TASK_ID).unwrap_or(1))
.unwrap()
}

fn next_session_id(&self) -> SessionId {
SessionId::from(get_infra_u32(&self.database, META_KEY_SESSION_ID).unwrap_or(0) + 1)
SessionId::try_from(get_infra_u32(&self.database, META_KEY_SESSION_ID).unwrap_or(0) + 1)
.unwrap()
}

fn uncompleted_operations(&self) -> Vec<AnyOperation> {
Expand Down Expand Up @@ -367,7 +369,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
return Ok(None);
};
let bytes = bytes.borrow().try_into()?;
let id = TaskId::from(u32::from_le_bytes(bytes));
let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap();
Ok(Some(id))
}
if self.database.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl MemoryBackend {
persistent_tasks: NoMoveVec::new(),
transient_tasks: NoMoveVec::new(),
backend_jobs: NoMoveVec::new(),
backend_job_id_factory: IdFactoryWithReuse::new(1, u32::MAX as u64),
backend_job_id_factory: IdFactoryWithReuse::new(BackendJobId::MIN, BackendJobId::MAX),
task_cache: DashMap::with_hasher_and_shard_amount(Default::default(), shard_amount),
transient_task_cache: DashMap::with_hasher_and_shard_amount(
Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions turbopack/crates/turbo-tasks-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl VcStorage {
})));
i
};
let task_id = TaskId::from(i as u32 + 1);
let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
handle.spawn(with_turbo_tasks_for_testing(
this.clone(),
task_id,
Expand Down Expand Up @@ -321,7 +321,7 @@ impl VcStorage {
this: weak.clone(),
..Default::default()
}),
TaskId::from(u32::MAX),
TaskId::MAX,
f,
)
}
Expand Down
57 changes: 45 additions & 12 deletions turbopack/crates/turbo-tasks/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ macro_rules! define_id {
}

impl $name {
pub const MIN: Self = Self { id: NonZero::<$primitive>::MIN };
pub const MAX: Self = Self { id: NonZero::<$primitive>::MAX };

/// Constructs a wrapper type from the numeric identifier.
///
/// # Safety
Expand All @@ -37,6 +40,15 @@ macro_rules! define_id {
pub const unsafe fn new_unchecked(id: $primitive) -> Self {
Self { id: unsafe { NonZero::<$primitive>::new_unchecked(id) } }
}

/// Allows `const` conversion to a [`NonZeroU64`], useful with
/// [`crate::id_factory::IdFactory::new_const`].
pub const fn to_non_zero_u64(self) -> NonZeroU64 {
const {
assert!(<$primitive>::BITS <= u64::BITS);
}
unsafe { NonZeroU64::new_unchecked(self.id.get() as u64) }
}
}

impl Display for $name {
Expand All @@ -53,30 +65,51 @@ macro_rules! define_id {
}
}

/// Converts a numeric identifier to the wrapper type.
///
/// Panics if the given id value is zero.
impl From<$primitive> for $name {
fn from(id: $primitive) -> Self {
define_id!(@impl_try_from_primitive_conversion $name $primitive);

impl From<NonZero<$primitive>> for $name {
fn from(id: NonZero::<$primitive>) -> Self {
Self {
id: NonZero::<$primitive>::new(id)
.expect("Ids can only be created from non zero values")
id,
}
}
}

/// Converts a numeric identifier to the wrapper type.
impl From<$name> for NonZeroU64 {
fn from(id: $name) -> Self {
id.to_non_zero_u64()
}
}

impl TraceRawVcs for $name {
fn trace_raw_vcs(&self, _trace_context: &mut TraceRawVcsContext) {}
}
};
(
@impl_try_from_primitive_conversion $name:ident u64
) => {
// we get a `TryFrom` blanket impl for free via the `From` impl
};
(
@impl_try_from_primitive_conversion $name:ident $primitive:ty
) => {
impl TryFrom<$primitive> for $name {
type Error = TryFromIntError;

fn try_from(id: $primitive) -> Result<Self, Self::Error> {
Ok(Self {
id: NonZero::try_from(id)?
})
}
}

impl TryFrom<NonZeroU64> for $name {
type Error = TryFromIntError;

fn try_from(id: NonZeroU64) -> Result<Self, Self::Error> {
Ok(Self { id: NonZero::try_from(id)? })
}
}

impl TraceRawVcs for $name {
fn trace_raw_vcs(&self, _trace_context: &mut TraceRawVcsContext) {}
}
};
}

Expand Down
120 changes: 91 additions & 29 deletions turbopack/crates/turbo-tasks/src/id_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,36 @@ use concurrent_queue::ConcurrentQueue;
///
/// For ids that may be re-used, see [`IdFactoryWithReuse`].
pub struct IdFactory<T> {
next_id: AtomicU64,
max_id: u64,
/// A value starting at 0 and incremented each time a new id is allocated. Regardless of the
/// underlying type, a u64 is used to cheaply detect overflows.
counter: AtomicU64,
/// We've overflowed if the `counter > max_count`.
max_count: u64,
id_offset: u64, // added to the value received from `counter`
_phantom_data: PhantomData<T>,
}

impl<T> IdFactory<T> {
pub const fn new(start: u64, max: u64) -> Self {
/// Create a factory for ids in the range `start..=max`.
pub fn new(start: T, max: T) -> Self
where
T: Into<NonZeroU64> + Ord,
{
Self::new_const(start.into(), max.into())
}

/// Create a factory for ids in the range `start..=max`.
///
/// Provides a less convenient API than [`IdFactory::new`], but skips a type conversion that
/// would make the function non-const.
pub const fn new_const(start: NonZeroU64, max: NonZeroU64) -> Self {
assert!(start.get() < max.get());
Self {
next_id: AtomicU64::new(start),
max_id: max,
// Always start `counter` at 0, don't use the value of `start` because `start` could be
// close to `u64::MAX`.
counter: AtomicU64::new(0),
max_count: max.get() - start.get(),
id_offset: start.get(),
_phantom_data: PhantomData,
}
}
Expand All @@ -32,47 +52,80 @@ where
{
/// Return a unique new id.
///
/// Panics (best-effort) if the id type overflows.
/// Panics if the id type overflows.
pub fn get(&self) -> T {
let new_id = self.next_id.fetch_add(1, Ordering::Relaxed);
let count = self.counter.fetch_add(1, Ordering::Relaxed);

#[cfg(debug_assertions)]
{
if count == u64::MAX {
// u64 counter is about to overflow -- this should never happen! A `u64` counter
// starting at 0 should take decades to overflow on a single machine.
//
// This is unrecoverable because other threads may have already read the overflowed
// value, so abort the entire process.
std::process::abort()
}
}

if new_id > self.max_id {
// `max_count` might be something like `u32::MAX`. The extra bits of `u64` are useful to
// detect overflows in that case. We assume the u64 counter is large enough to never
// overflow.
if count > self.max_count {
panic!(
"Max id limit hit while attempting to generate a unique {}",
"Max id limit (overflow) hit while attempting to generate a unique {}",
type_name::<T>(),
)
}

// Safety: u64 will not overflow. This is *very* unlikely to happen (would take
// decades).
let new_id = unsafe { NonZeroU64::new_unchecked(new_id) };
let new_id_u64 = count + self.id_offset;
// Safety:
// - `count` is assumed not to overflow.
// - `id_offset` is a non-zero value.
// - `id_offset + count < u64::MAX`.
let new_id = unsafe { NonZeroU64::new_unchecked(new_id_u64) };

// Use the extra bits of the AtomicU64 as cheap overflow detection when the
// value is less than 64 bits.
match new_id.try_into() {
Ok(id) => id,
// With any sane implementation of `TryFrom`, this shouldn't happen, as we've already
// checked the `max_count` bound. (Could happen with the `new_const` constructor)
Err(_) => panic!(
"Overflow detected while attempting to generate a unique {}",
type_name::<T>(),
"Failed to convert NonZeroU64 value of {} into {}",
new_id,
type_name::<T>()
),
}
}
}

/// An [`IdFactory`], but extended with a free list to allow for id reuse for
/// ids such as [`BackendJobId`][crate::backend::BackendJobId].
/// An [`IdFactory`], but extended with a free list to allow for id reuse for ids such as
/// [`BackendJobId`][crate::backend::BackendJobId].
pub struct IdFactoryWithReuse<T> {
factory: IdFactory<T>,
free_ids: ConcurrentQueue<T>,
}

impl<T> IdFactoryWithReuse<T> {
pub const fn new(start: u64, max: u64) -> Self {
impl<T> IdFactoryWithReuse<T>
where
T: Into<NonZeroU64> + Ord,
{
/// Create a factory for ids in the range `start..=max`.
pub fn new(start: T, max: T) -> Self {
Self {
factory: IdFactory::new(start, max),
free_ids: ConcurrentQueue::unbounded(),
}
}

/// Create a factory for ids in the range `start..=max`. Provides a less convenient API than
/// [`IdFactoryWithReuse::new`], but skips a type conversion that would make the function
/// non-const.
pub const fn new_const(start: NonZeroU64, max: NonZeroU64) -> Self {
Self {
factory: IdFactory::new_const(start, max),
free_ids: ConcurrentQueue::unbounded(),
}
}
}

impl<T> IdFactoryWithReuse<T>
Expand All @@ -81,18 +134,18 @@ where
{
/// Return a new or potentially reused id.
///
/// Panics (best-effort) if the id type overflows.
/// Panics if the id type overflows.
pub fn get(&self) -> T {
self.free_ids.pop().unwrap_or_else(|_| self.factory.get())
}

/// Add an id to the free list, allowing it to be re-used on a subsequent
/// call to [`IdFactoryWithReuse::get`].
/// Add an id to the free list, allowing it to be re-used on a subsequent call to
/// [`IdFactoryWithReuse::get`].
///
/// # Safety
///
/// It must be ensured that the id is no longer used. Id must be a valid id
/// that was previously returned by `get`.
/// The id must no longer be used. Must be a valid id that was previously returned by
/// [`IdFactoryWithReuse::get`].
pub unsafe fn reuse(&self, id: T) {
let _ = self.free_ids.push(id);
}
Expand All @@ -105,12 +158,21 @@ mod tests {
use super::*;

#[test]
#[should_panic(expected = "Overflow detected")]
fn test_overflow() {
let factory = IdFactory::<NonZeroU8>::new(1, u16::MAX as u64);
#[should_panic(expected = "Max id limit (overflow)")]
fn test_overflow_detection() {
let factory = IdFactory::new(NonZeroU8::MIN, NonZeroU8::MAX);
assert_eq!(factory.get(), NonZeroU8::new(1).unwrap());
assert_eq!(factory.get(), NonZeroU8::new(2).unwrap());
for _i in 2..256 {
for _ in 2..256 {
factory.get();
}
}

#[test]
#[should_panic(expected = "Max id limit (overflow)")]
fn test_overflow_detection_near_u64_max() {
let factory = IdFactory::new(NonZeroU64::try_from(u64::MAX - 5).unwrap(), NonZeroU64::MAX);
for _ in 0..=6 {
factory.get();
}
}
Expand Down
11 changes: 7 additions & 4 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,9 @@ impl CurrentTaskState {

fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
self.local_tasks.push(local_task);
// generate a one-indexed id
// generate a one-indexed id from len() -- we just pushed so len() is >= 1
if cfg!(debug_assertions) {
LocalTaskId::from(u32::try_from(self.local_tasks.len()).unwrap())
LocalTaskId::try_from(u32::try_from(self.local_tasks.len()).unwrap()).unwrap()
} else {
unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) }
}
Expand Down Expand Up @@ -452,9 +452,12 @@ impl<B: Backend + 'static> TurboTasks<B> {
// so we probably want to make sure that all tasks are joined
// when trying to drop turbo tasks
pub fn new(backend: B) -> Arc<Self> {
let task_id_factory = IdFactoryWithReuse::new(1, (TRANSIENT_TASK_BIT - 1) as u64);
let task_id_factory = IdFactoryWithReuse::new(
TaskId::MIN,
TaskId::try_from(TRANSIENT_TASK_BIT - 1).unwrap(),
);
let transient_task_id_factory =
IdFactoryWithReuse::new(TRANSIENT_TASK_BIT as u64, u32::MAX as u64);
IdFactoryWithReuse::new(TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX);
let this = Arc::new_cyclic(|this| Self {
this: this.clone(),
backend,
Expand Down
Loading
Loading