Skip to content

Commit

Permalink
std: Unsafe-wrap std::sync
Browse files Browse the repository at this point in the history
  • Loading branch information
workingjubilee committed Jul 15, 2024
1 parent e8fa3ef commit 5ff7b40
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 41 deletions.
1 change: 0 additions & 1 deletion std/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@
//! [`RwLock`]: crate::sync::RwLock
#![stable(feature = "rust1", since = "1.0.0")]
#![allow(unsafe_op_in_unsafe_fn)]

#[stable(feature = "rust1", since = "1.0.0")]
pub use alloc_crate::sync::{Arc, Weak};
Expand Down
22 changes: 13 additions & 9 deletions std/src/sync/mpmc/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ impl<T> Channel<T> {
return Err(msg);
}

let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);

// Write the message into the slot and update the stamp.
slot.msg.get().write(MaybeUninit::new(msg));
slot.stamp.store(token.array.stamp, Ordering::Release);
unsafe {
let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
slot.msg.get().write(MaybeUninit::new(msg));
slot.stamp.store(token.array.stamp, Ordering::Release);
}

// Wake a sleeping receiver.
self.receivers.notify();
Expand Down Expand Up @@ -291,11 +292,14 @@ impl<T> Channel<T> {
return Err(());
}

let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);

// Read the message from the slot and update the stamp.
let msg = slot.msg.get().read().assume_init();
slot.stamp.store(token.array.stamp, Ordering::Release);
let msg = unsafe {
let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);

let msg = slot.msg.get().read().assume_init();
slot.stamp.store(token.array.stamp, Ordering::Release);
msg
};

// Wake a sleeping sender.
self.senders.notify();
Expand Down Expand Up @@ -471,7 +475,7 @@ impl<T> Channel<T> {
false
};

self.discard_all_messages(tail);
unsafe { self.discard_all_messages(tail) };
disconnected
}

Expand Down
4 changes: 2 additions & 2 deletions std/src/sync/mpmc/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<C> Sender<C> {
disconnect(&self.counter().chan);

if self.counter().destroy.swap(true, Ordering::AcqRel) {
drop(Box::from_raw(self.counter));
drop(unsafe { Box::from_raw(self.counter) });
}
}
}
Expand Down Expand Up @@ -116,7 +116,7 @@ impl<C> Receiver<C> {
disconnect(&self.counter().chan);

if self.counter().destroy.swap(true, Ordering::AcqRel) {
drop(Box::from_raw(self.counter));
drop(unsafe { Box::from_raw(self.counter) });
}
}
}
Expand Down
38 changes: 21 additions & 17 deletions std/src/sync/mpmc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl<T> Block<T> {
// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
// begun destruction of the block.
for i in start..BLOCK_CAP - 1 {
let slot = (*this).slots.get_unchecked(i);
let slot = unsafe { (*this).slots.get_unchecked(i) };

// Mark the `DESTROY` bit if a thread is still using the slot.
if slot.state.load(Ordering::Acquire) & READ == 0
Expand All @@ -103,7 +103,7 @@ impl<T> Block<T> {
}

// No thread is using the block, now it is safe to destroy it.
drop(Box::from_raw(this));
drop(unsafe { Box::from_raw(this) });
}
}

Expand Down Expand Up @@ -265,9 +265,11 @@ impl<T> Channel<T> {
// Write the message into the slot.
let block = token.list.block as *mut Block<T>;
let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset);
slot.msg.get().write(MaybeUninit::new(msg));
slot.state.fetch_or(WRITE, Ordering::Release);
unsafe {
let slot = (*block).slots.get_unchecked(offset);
slot.msg.get().write(MaybeUninit::new(msg));
slot.state.fetch_or(WRITE, Ordering::Release);
}

// Wake a sleeping receiver.
self.receivers.notify();
Expand Down Expand Up @@ -369,19 +371,21 @@ impl<T> Channel<T> {
// Read the message.
let block = token.list.block as *mut Block<T>;
let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let msg = slot.msg.get().read().assume_init();

// Destroy the block if we've reached the end, or if another thread wanted to destroy but
// couldn't because we were busy reading from the slot.
if offset + 1 == BLOCK_CAP {
Block::destroy(block, 0);
} else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset + 1);
}
unsafe {
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let msg = slot.msg.get().read().assume_init();

// Destroy the block if we've reached the end, or if another thread wanted to destroy but
// couldn't because we were busy reading from the slot.
if offset + 1 == BLOCK_CAP {
Block::destroy(block, 0);
} else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset + 1);
}

Ok(msg)
Ok(msg)
}
}

/// Attempts to send a message into the channel.
Expand Down
20 changes: 12 additions & 8 deletions std/src/sync/mpmc/zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ impl<T> Channel<T> {
return Err(msg);
}

let packet = &*(token.zero.0 as *const Packet<T>);
packet.msg.get().write(Some(msg));
packet.ready.store(true, Ordering::Release);
unsafe {
let packet = &*(token.zero.0 as *const Packet<T>);
packet.msg.get().write(Some(msg));
packet.ready.store(true, Ordering::Release);
}
Ok(())
}

Expand All @@ -116,22 +118,24 @@ impl<T> Channel<T> {
return Err(());
}

let packet = &*(token.zero.0 as *const Packet<T>);
let packet = unsafe { &*(token.zero.0 as *const Packet<T>) };

if packet.on_stack {
// The message has been in the packet from the beginning, so there is no need to wait
// for it. However, after reading the message, we need to set `ready` to `true` in
// order to signal that the packet can be destroyed.
let msg = packet.msg.get().replace(None).unwrap();
let msg = unsafe { packet.msg.get().replace(None) }.unwrap();
packet.ready.store(true, Ordering::Release);
Ok(msg)
} else {
// Wait until the message becomes available, then read it and destroy the
// heap-allocated packet.
packet.wait_ready();
let msg = packet.msg.get().replace(None).unwrap();
drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
Ok(msg)
unsafe {
let msg = packet.msg.get().replace(None).unwrap();
drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
Ok(msg)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions std/src/sync/once_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl<T> OnceLock<T> {
#[inline]
unsafe fn get_unchecked(&self) -> &T {
debug_assert!(self.is_initialized());
(&*self.value.get()).assume_init_ref()
unsafe { (&*self.value.get()).assume_init_ref() }
}

/// # Safety
Expand All @@ -511,7 +511,7 @@ impl<T> OnceLock<T> {
#[inline]
unsafe fn get_unchecked_mut(&mut self) -> &mut T {
debug_assert!(self.is_initialized());
(&mut *self.value.get()).assume_init_mut()
unsafe { (&mut *self.value.get()).assume_init_mut() }
}
}

Expand Down
4 changes: 3 additions & 1 deletion std/src/sync/reentrant_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ impl<T: ?Sized> ReentrantLock<T> {
}

unsafe fn increment_lock_count(&self) -> Option<()> {
*self.lock_count.get() = (*self.lock_count.get()).checked_add(1)?;
unsafe {
*self.lock_count.get() = (*self.lock_count.get()).checked_add(1)?;
}
Some(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion std/src/sync/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> {
// successfully called from the same thread before instantiating this object.
unsafe fn new(lock: &'rwlock RwLock<T>) -> LockResult<RwLockReadGuard<'rwlock, T>> {
poison::map_result(lock.poison.borrow(), |()| RwLockReadGuard {
data: NonNull::new_unchecked(lock.data.get()),
data: unsafe { NonNull::new_unchecked(lock.data.get()) },
inner_lock: &lock.inner,
})
}
Expand Down

0 comments on commit 5ff7b40

Please sign in to comment.