Skip to content

Commit

Permalink
fix(pool): don't leak connections if drop task doesn't run (#1799)
Browse files Browse the repository at this point in the history
fixes #1396
  • Loading branch information
abonander authored Apr 13, 2022
1 parent 973f3d1 commit 6c7006c
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 60 deletions.
49 changes: 27 additions & 22 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub(super) struct Idle<DB: Database> {
}

/// RAII wrapper for connections being handled by functions that may drop them
pub(super) struct Floating<'p, C> {
pub(super) struct Floating<DB: Database, C> {
pub(super) inner: C,
pub(super) guard: DecrementSizeGuard<'p>,
pub(super) guard: DecrementSizeGuard<DB>,
}

const DEREF_ERR: &str = "(bug) connection already released to pool";
Expand Down Expand Up @@ -89,7 +89,7 @@ impl<DB: Database> PoolConnection<DB> {
self.live
.take()
.expect("PoolConnection double-dropped")
.float(&self.pool)
.float(self.pool.clone())
.detach()
}

Expand All @@ -106,19 +106,20 @@ impl<DB: Database> PoolConnection<DB> {
///
/// This effectively runs the drop handler eagerly instead of spawning a task to do it.
pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
// we want these to happen synchronously so the drop handler doesn't try to spawn a task anyway
// this also makes the returned future `'static`
let live = self.live.take();
let pool = self.pool.clone();
// float the connection in the pool before we move into the task
// in case the returned `Future` isn't executed, like if it's spawned into a dying runtime
// https://github.com/launchbadge/sqlx/issues/1396
let floating = self.live.take().map(|live| live.float(self.pool.clone()));

async move {
let mut floating = if let Some(live) = live {
live.float(&pool)
let mut floating = if let Some(floating) = floating {
floating
} else {
return;
};

// test the connection on-release to ensure it is still viable
// test the connection on-release to ensure it is still viable,
// and flush anything time-sensitive like transaction rollbacks
// if an Executor future/stream is dropped during an `.await` call, the connection
// is likely to be left in an inconsistent state, in which case it should not be
// returned to the pool; also of course, if it was dropped due to an error
Expand All @@ -135,7 +136,7 @@ impl<DB: Database> PoolConnection<DB> {
drop(floating);
} else {
// if the connection is still viable, release it to the pool
pool.release(floating);
floating.release();
}
}
}
Expand All @@ -157,7 +158,7 @@ impl<DB: Database> Drop for PoolConnection<DB> {
}

impl<DB: Database> Live<DB> {
pub fn float(self, pool: &SharedPool<DB>) -> Floating<'_, Self> {
pub fn float(self, pool: Arc<SharedPool<DB>>) -> Floating<DB, Self> {
Floating {
inner: self,
// create a new guard from a previously leaked permit
Expand Down Expand Up @@ -187,8 +188,8 @@ impl<DB: Database> DerefMut for Idle<DB> {
}
}

impl<'s, DB: Database> Floating<'s, Live<DB>> {
pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<'s>) -> Self {
impl<DB: Database> Floating<DB, Live<DB>> {
pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
Self {
inner: Live {
raw: conn,
Expand All @@ -213,6 +214,10 @@ impl<'s, DB: Database> Floating<'s, Live<DB>> {
}
}

pub fn release(self) {
self.guard.pool.clone().release(self);
}

pub async fn close(self) -> Result<(), Error> {
// `guard` is dropped as intended
self.inner.raw.close().await
Expand All @@ -222,19 +227,19 @@ impl<'s, DB: Database> Floating<'s, Live<DB>> {
self.inner.raw
}

pub fn into_idle(self) -> Floating<'s, Idle<DB>> {
pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
Floating {
inner: self.inner.into_idle(),
guard: self.guard,
}
}
}

impl<'s, DB: Database> Floating<'s, Idle<DB>> {
impl<DB: Database> Floating<DB, Idle<DB>> {
pub fn from_idle(
idle: Idle<DB>,
pool: &'s SharedPool<DB>,
permit: SemaphoreReleaser<'s>,
pool: Arc<SharedPool<DB>>,
permit: SemaphoreReleaser<'_>,
) -> Self {
Self {
inner: idle,
Expand All @@ -246,14 +251,14 @@ impl<'s, DB: Database> Floating<'s, Idle<DB>> {
self.live.raw.ping().await
}

pub fn into_live(self) -> Floating<'s, Live<DB>> {
pub fn into_live(self) -> Floating<DB, Live<DB>> {
Floating {
inner: self.inner.live,
guard: self.guard,
}
}

pub async fn close(self) -> DecrementSizeGuard<'s> {
pub async fn close(self) -> DecrementSizeGuard<DB> {
// `guard` is dropped as intended
if let Err(e) = self.inner.live.raw.close().await {
log::debug!("error occurred while closing the pool connection: {}", e);
Expand All @@ -262,15 +267,15 @@ impl<'s, DB: Database> Floating<'s, Idle<DB>> {
}
}

impl<C> Deref for Floating<'_, C> {
impl<DB: Database, C> Deref for Floating<DB, C> {
type Target = C;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<C> DerefMut for Floating<'_, C> {
impl<DB: Database, C> DerefMut for Floating<DB, C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
Expand Down
69 changes: 32 additions & 37 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<DB: Database> SharedPool<DB> {
self.is_closed.load(Ordering::Acquire)
}

pub(super) async fn close(&self) {
pub(super) async fn close(self: &Arc<Self>) {
let already_closed = self.is_closed.swap(true, Ordering::AcqRel);

if !already_closed {
Expand All @@ -93,12 +93,12 @@ impl<DB: Database> SharedPool<DB> {
.await;

while let Some(idle) = self.idle_conns.pop() {
let _ = idle.live.float(self).close().await;
let _ = idle.live.float((*self).clone()).close().await;
}
}

#[inline]
pub(super) fn try_acquire(&self) -> Option<Floating<'_, Idle<DB>>> {
pub(super) fn try_acquire(self: &Arc<Self>) -> Option<Floating<DB, Idle<DB>>> {
if self.is_closed() {
return None;
}
Expand All @@ -108,17 +108,17 @@ impl<DB: Database> SharedPool<DB> {
}

fn pop_idle<'a>(
&'a self,
self: &'a Arc<Self>,
permit: SemaphoreReleaser<'a>,
) -> Result<Floating<'a, Idle<DB>>, SemaphoreReleaser<'a>> {
) -> Result<Floating<DB, Idle<DB>>, SemaphoreReleaser<'a>> {
if let Some(idle) = self.idle_conns.pop() {
Ok(Floating::from_idle(idle, self, permit))
Ok(Floating::from_idle(idle, (*self).clone(), permit))
} else {
Err(permit)
}
}

pub(super) fn release(&self, mut floating: Floating<'_, Live<DB>>) {
pub(super) fn release(&self, mut floating: Floating<DB, Live<DB>>) {
if let Some(test) = &self.options.after_release {
if !test(&mut floating.raw) {
// drop the connection and do not return it to the pool
Expand All @@ -141,24 +141,24 @@ impl<DB: Database> SharedPool<DB> {
///
/// Returns `None` if we are at max_connections or if the pool is closed.
pub(super) fn try_increment_size<'a>(
&'a self,
self: &'a Arc<Self>,
permit: SemaphoreReleaser<'a>,
) -> Result<DecrementSizeGuard<'a>, SemaphoreReleaser<'a>> {
) -> Result<DecrementSizeGuard<DB>, SemaphoreReleaser<'a>> {
match self
.size
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| {
size.checked_add(1)
.filter(|size| size <= &self.options.max_connections)
}) {
// we successfully incremented the size
Ok(_) => Ok(DecrementSizeGuard::from_permit(self, permit)),
Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)),
// the pool is at max capacity
Err(_) => Err(permit),
}
}

#[allow(clippy::needless_lifetimes)]
pub(super) async fn acquire<'s>(&'s self) -> Result<Floating<'s, Live<DB>>, Error> {
pub(super) async fn acquire(self: &Arc<Self>) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
Expand Down Expand Up @@ -206,11 +206,11 @@ impl<DB: Database> SharedPool<DB> {
.map_err(|_| Error::PoolTimedOut)?
}

pub(super) async fn connection<'s>(
&'s self,
pub(super) async fn connection(
self: &Arc<Self>,
deadline: Instant,
guard: DecrementSizeGuard<'s>,
) -> Result<Floating<'s, Live<DB>>, Error> {
guard: DecrementSizeGuard<DB>,
) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
Expand Down Expand Up @@ -275,10 +275,10 @@ fn is_beyond_idle<DB: Database>(idle: &Idle<DB>, options: &PoolOptions<DB>) -> b
.map_or(false, |timeout| idle.since.elapsed() > timeout)
}

async fn check_conn<'s: 'p, 'p, DB: Database>(
mut conn: Floating<'s, Idle<DB>>,
options: &'p PoolOptions<DB>,
) -> Result<Floating<'s, Live<DB>>, DecrementSizeGuard<'s>> {
async fn check_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_lifetime(&conn, options) {
Expand Down Expand Up @@ -337,7 +337,7 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
});
}

async fn do_reap<DB: Database>(pool: &SharedPool<DB>) {
async fn do_reap<DB: Database>(pool: &Arc<SharedPool<DB>>) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);

Expand All @@ -363,39 +363,34 @@ async fn do_reap<DB: Database>(pool: &SharedPool<DB>) {
///
/// Will decrement the pool size if dropped, to avoid semantically "leaking" connections
/// (where the pool thinks it has more connections than it does).
pub(in crate::pool) struct DecrementSizeGuard<'a> {
size: &'a AtomicU32,
semaphore: &'a Semaphore,
pub(in crate::pool) struct DecrementSizeGuard<DB: Database> {
pub(crate) pool: Arc<SharedPool<DB>>,
dropped: bool,
}

impl<'a> DecrementSizeGuard<'a> {
impl<DB: Database> DecrementSizeGuard<DB> {
/// Create a new guard that will release a semaphore permit on-drop.
pub fn new_permit<DB: Database>(pool: &'a SharedPool<DB>) -> Self {
pub fn new_permit(pool: Arc<SharedPool<DB>>) -> Self {
Self {
size: &pool.size,
semaphore: &pool.semaphore,
pool,
dropped: false,
}
}

pub fn from_permit<DB: Database>(
pool: &'a SharedPool<DB>,
mut permit: SemaphoreReleaser<'a>,
) -> Self {
pub fn from_permit(pool: Arc<SharedPool<DB>>, mut permit: SemaphoreReleaser<'_>) -> Self {
// here we effectively take ownership of the permit
permit.disarm();
Self::new_permit(pool)
}

/// Return `true` if the internal references point to the same fields in `SharedPool`.
pub fn same_pool<DB: Database>(&self, pool: &'a SharedPool<DB>) -> bool {
ptr::eq(self.size, &pool.size)
pub fn same_pool(&self, pool: &SharedPool<DB>) -> bool {
ptr::eq(&*self.pool, pool)
}

/// Release the semaphore permit without decreasing the pool size.
fn release_permit(self) {
self.semaphore.release(1);
self.pool.semaphore.release(1);
self.cancel();
}

Expand All @@ -404,13 +399,13 @@ impl<'a> DecrementSizeGuard<'a> {
}
}

impl Drop for DecrementSizeGuard<'_> {
impl<DB: Database> Drop for DecrementSizeGuard<DB> {
fn drop(&mut self) {
assert!(!self.dropped, "double-dropped!");
self.dropped = true;
self.size.fetch_sub(1, Ordering::SeqCst);
self.pool.size.fetch_sub(1, Ordering::SeqCst);

// and here we release the permit we got on construction
self.semaphore.release(1);
self.pool.semaphore.release(1);
}
}
2 changes: 1 addition & 1 deletion sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl<DB: Database> PoolOptions<DB> {
}
}

async fn init_min_connections<DB: Database>(pool: &SharedPool<DB>) -> Result<(), Error> {
async fn init_min_connections<DB: Database>(pool: &Arc<SharedPool<DB>>) -> Result<(), Error> {
for _ in 0..cmp::max(pool.options.min_connections, 1) {
let deadline = Instant::now() + pool.options.connect_timeout;
let permit = pool.semaphore.acquire(1).await;
Expand Down

0 comments on commit 6c7006c

Please sign in to comment.