Skip to content

Commit

Permalink
sync: add Semaphore::close (#3065)
Browse files Browse the repository at this point in the history
## Motivation
The need to expose `Semaphore::close` as explained in #3061. 

## Solution
Expose `Semaphore::close`

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev authored Dec 15, 2020
1 parent 79d25b0 commit fcce78b
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 37 deletions.
25 changes: 18 additions & 7 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,28 @@ struct Waitlist {
closed: bool,
}

/// Error returned by `Semaphore::try_acquire`.
#[derive(Debug)]
pub(crate) enum TryAcquireError {
/// Error returned from the [`Semaphore::try_acquire`] function.
///
/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
#[derive(Debug, PartialEq)]
pub enum TryAcquireError {
/// The semaphore has been [closed] and cannot issue new permits.
///
/// [closed]: crate::sync::Semaphore::close
Closed,

/// The semaphore has no available permits.
NoPermits,
}
/// Error returned by `Semaphore::acquire`.
/// Error returned from the [`Semaphore::acquire`] function.
///
/// An `acquire` operation can only fail if the semaphore has been
/// [closed].
///
/// [closed]: crate::sync::Semaphore::close
/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
#[derive(Debug)]
pub(crate) struct AcquireError(());
pub struct AcquireError(());

pub(crate) struct Acquire<'a> {
node: Waiter,
Expand Down Expand Up @@ -164,8 +177,6 @@ impl Semaphore {

/// Closes the semaphore. This prevents the semaphore from issuing new
/// permits and notifies all pending waiters.
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
pub(crate) fn close(&self) {
let mut waiters = self.waiters.lock();
// If the semaphore's permits counter has enough permits for an
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,10 @@ cfg_sync! {
pub mod oneshot;

pub(crate) mod batch_semaphore;
pub use batch_semaphore::{AcquireError, TryAcquireError};

mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit, TryAcquireError};
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};

mod rwlock;
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
Expand Down
125 changes: 96 additions & 29 deletions tokio/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::batch_semaphore as ll; // low level implementation
use super::{AcquireError, TryAcquireError};
use std::sync::Arc;

/// Counting semaphore performing asynchronous permit acquisition.
Expand Down Expand Up @@ -42,15 +43,6 @@ pub struct OwnedSemaphorePermit {
permits: u32,
}

/// Error returned from the [`Semaphore::try_acquire`] function.
///
/// A `try_acquire` operation can only fail if the semaphore has no available
/// permits.
///
/// [`Semaphore::try_acquire`]: Semaphore::try_acquire
#[derive(Debug)]
pub struct TryAcquireError(());

#[test]
#[cfg(not(loom))]
fn bounds() {
Expand Down Expand Up @@ -95,73 +87,148 @@ impl Semaphore {
self.ll_sem.release(n);
}

/// Acquires permit from the semaphore.
pub async fn acquire(&self) -> SemaphorePermit<'_> {
self.ll_sem.acquire(1).await.unwrap();
SemaphorePermit {
/// Acquires a permit from the semaphore.
///
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`SemaphorePermit`] representing the
/// acquired permit.
///
/// [`AcquireError`]: crate::sync::AcquireError
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
self.ll_sem.acquire(1).await?;
Ok(SemaphorePermit {
sem: &self,
permits: 1,
}
})
}

/// Acquires `n` permits from the semaphore
pub async fn acquire_many(&self, n: u32) -> SemaphorePermit<'_> {
self.ll_sem.acquire(n).await.unwrap();
SemaphorePermit {
/// Acquires `n` permits from the semaphore.
///
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`SemaphorePermit`] representing the
/// acquired permits.
///
/// [`AcquireError`]: crate::sync::AcquireError
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
self.ll_sem.acquire(n).await?;
Ok(SemaphorePermit {
sem: &self,
permits: n,
}
})
}

/// Tries to acquire a permit from the semaphore.
///
/// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
/// this returns a [`SemaphorePermit`] representing the acquired permits.
///
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
match self.ll_sem.try_acquire(1) {
Ok(_) => Ok(SemaphorePermit {
sem: self,
permits: 1,
}),
Err(_) => Err(TryAcquireError(())),
Err(e) => Err(e),
}
}

/// Tries to acquire `n` permits from the semaphore.
/// Tries to acquire n permits from the semaphore.
///
/// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
/// this returns a [`SemaphorePermit`] representing the acquired permits.
///
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
match self.ll_sem.try_acquire(n) {
Ok(_) => Ok(SemaphorePermit {
sem: self,
permits: n,
}),
Err(_) => Err(TryAcquireError(())),
Err(e) => Err(e),
}
}

/// Acquires permit from the semaphore.
/// Acquires a permit from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method.
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
/// [`Arc`]: std::sync::Arc
pub async fn acquire_owned(self: Arc<Self>) -> OwnedSemaphorePermit {
self.ll_sem.acquire(1).await.unwrap();
OwnedSemaphorePermit {
/// [`AcquireError`]: crate::sync::AcquireError
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
self.ll_sem.acquire(1).await?;
Ok(OwnedSemaphorePermit {
sem: self,
permits: 1,
}
})
}

/// Tries to acquire a permit from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method.
/// The semaphore must be wrapped in an [`Arc`] to call this method. If
/// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left.
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
/// [`Arc`]: std::sync::Arc
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
match self.ll_sem.try_acquire(1) {
Ok(_) => Ok(OwnedSemaphorePermit {
sem: self,
permits: 1,
}),
Err(_) => Err(TryAcquireError(())),
Err(e) => Err(e),
}
}

/// Closes the semaphore.
///
/// This prevents the semaphore from issuing new permits and notifies all pending waiters.
///
/// # Examples
///
/// ```
/// use tokio::sync::Semaphore;
/// use std::sync::Arc;
/// use tokio::sync::TryAcquireError;
///
/// #[tokio::main]
/// async fn main() {
/// let semaphore = Arc::new(Semaphore::new(1));
/// let semaphore2 = semaphore.clone();
///
/// tokio::spawn(async move {
/// let permit = semaphore.acquire_many(2).await;
/// assert!(permit.is_err());
/// println!("waiter received error");
/// });
///
/// println!("closing semaphore");
/// semaphore2.close();
///
/// // Cannot obtain more permits
/// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
/// }
/// ```
pub fn close(&self) {
self.ll_sem.close();
}
}

impl<'a> SemaphorePermit<'a> {
Expand Down

0 comments on commit fcce78b

Please sign in to comment.