Skip to content

Commit

Permalink
feat(logging): Log acquires from connection pool
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Potts <8704475+iamjpotts@users.noreply.github.com>
  • Loading branch information
iamjpotts committed Mar 15, 2024
1 parent 9ba488c commit 49f3f45
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 6 deletions.
6 changes: 6 additions & 0 deletions sqlx-core/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ pub fn private_level_filter_to_levels(
tracing_level.zip(filter.to_level())
}

pub(crate) fn private_level_filter_to_trace_level(
filter: log::LevelFilter,
) -> Option<tracing::Level> {
private_level_filter_to_levels(filter).map(|(level, _)| level)
}

pub use sqlformat;

pub struct QueryLogger<'q> {
Expand Down
41 changes: 37 additions & 4 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::task::Poll;

use crate::logger::private_level_filter_to_trace_level;
use crate::pool::options::PoolConnectionMetadata;
use crate::private_tracing_dynamic_event;
use futures_util::future::{self};
use futures_util::FutureExt;
use std::time::{Duration, Instant};
use tracing::Level;

pub(crate) struct PoolInner<DB: Database> {
pub(super) connect_options: RwLock<Arc<<DB::Connection as Connection>::Options>>,
Expand All @@ -28,6 +31,8 @@ pub(crate) struct PoolInner<DB: Database> {
is_closed: AtomicBool,
pub(super) on_closed: event_listener::Event,
pub(super) options: PoolOptions<DB>,
pub(crate) acquire_time_level: Option<Level>,
pub(crate) acquire_slow_level: Option<Level>,
}

impl<DB: Database> PoolInner<DB> {
Expand All @@ -54,6 +59,8 @@ impl<DB: Database> PoolInner<DB> {
num_idle: AtomicUsize::new(0),
is_closed: AtomicBool::new(false),
on_closed: event_listener::Event::new(),
acquire_time_level: private_level_filter_to_trace_level(options.acquire_time_level),
acquire_slow_level: private_level_filter_to_trace_level(options.acquire_slow_level),
options,
};

Expand Down Expand Up @@ -241,9 +248,10 @@ impl<DB: Database> PoolInner<DB> {
return Err(Error::PoolClosed);
}

let deadline = Instant::now() + self.options.acquire_timeout;
let acquire_started_at = Instant::now();
let deadline = acquire_started_at + self.options.acquire_timeout;

crate::rt::timeout(
let acquired = crate::rt::timeout(
self.options.acquire_timeout,
async {
loop {
Expand Down Expand Up @@ -272,7 +280,7 @@ impl<DB: Database> PoolInner<DB> {
// or if the pool was closed between `acquire_permit()` and
// `try_increment_size()`.
tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying");
// If so, we're likely in the current-thread runtime if it's Tokio
// If so, we're likely in the current-thread runtime if it's Tokio,
// and so we should yield to let any spawned return_to_pool() tasks
// execute.
crate::rt::yield_now().await;
Expand All @@ -286,7 +294,32 @@ impl<DB: Database> PoolInner<DB> {
}
)
.await
.map_err(|_| Error::PoolTimedOut)?
.map_err(|_| Error::PoolTimedOut)??;

let acquired_after = acquire_started_at.elapsed();

let acquire_slow_level = self
.acquire_slow_level
.filter(|_| acquired_after > self.options.acquire_slow_threshold);

if let Some(level) = acquire_slow_level {
private_tracing_dynamic_event!(
target: "sqlx::pool::acquire",
level,
aquired_after_secs = acquired_after.as_secs_f64(),
slow_acquire_threshold_secs = self.options.acquire_slow_threshold.as_secs_f64(),
"acquired connection, but time to acquire exceeded slow threshold"
);
} else if let Some(level) = self.acquire_time_level {
private_tracing_dynamic_event!(
target: "sqlx::pool::acquire",
level,
aquired_after_secs = acquired_after.as_secs_f64(),
"acquired connection"
);
}

Ok(acquired)
}

pub(super) async fn connect(
Expand Down
2 changes: 1 addition & 1 deletion sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ pub use self::maybe::MaybePoolConnection;
/// scheme. However, in a web context, telling a client "go away, maybe try again later" results in
/// a sub-optimal user experience.
///
/// Instead with a connection pool, clients are made to wait in a fair queue for a connection to
/// Instead, with a connection pool, clients are made to wait in a fair queue for a connection to
/// become available; by using a single connection pool for your whole application, you can ensure
/// that you don't exceed the connection limit of your database server while allowing response
/// time to degrade gracefully at high load.
Expand Down
49 changes: 48 additions & 1 deletion sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::error::Error;
use crate::pool::inner::PoolInner;
use crate::pool::Pool;
use futures_core::future::BoxFuture;
use log::LevelFilter;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -74,6 +75,9 @@ pub struct PoolOptions<DB: Database> {
>,
>,
pub(crate) max_connections: u32,
pub(crate) acquire_time_level: LevelFilter,
pub(crate) acquire_slow_level: LevelFilter,
pub(crate) acquire_slow_threshold: Duration,
pub(crate) acquire_timeout: Duration,
pub(crate) min_connections: u32,
pub(crate) max_lifetime: Option<Duration>,
Expand All @@ -94,6 +98,9 @@ impl<DB: Database> Clone for PoolOptions<DB> {
before_acquire: self.before_acquire.clone(),
after_release: self.after_release.clone(),
max_connections: self.max_connections,
acquire_time_level: self.acquire_time_level,
acquire_slow_threshold: self.acquire_slow_threshold,
acquire_slow_level: self.acquire_slow_level,
acquire_timeout: self.acquire_timeout,
min_connections: self.min_connections,
max_lifetime: self.max_lifetime,
Expand Down Expand Up @@ -143,6 +150,13 @@ impl<DB: Database> PoolOptions<DB> {
// A production application will want to set a higher limit than this.
max_connections: 10,
min_connections: 0,
// Logging all acquires is opt-in
acquire_time_level: LevelFilter::Off,
// Default to warning, because an acquire timeout will be an error
acquire_slow_level: LevelFilter::Warn,
// Fast enough to catch problems (e.g. a full pool); slow enough
// to not flag typical time to add a new connection to a pool.
acquire_slow_threshold: Duration::from_secs(2),
acquire_timeout: Duration::from_secs(30),
idle_timeout: Some(Duration::from_secs(10 * 60)),
max_lifetime: Some(Duration::from_secs(30 * 60)),
Expand Down Expand Up @@ -198,6 +212,39 @@ impl<DB: Database> PoolOptions<DB> {
self.min_connections
}

/// Enable logging of time taken to acquire a connection from the connection pool via
/// [`Pool::acquire()`].
///
/// If slow acquire logging is also enabled, this level is used for acquires that are not
/// considered slow.
pub fn acquire_time_level(mut self, level: LevelFilter) -> Self {
self.acquire_time_level = level;
self
}

/// Log excessive time taken to acquire a connection at a different log level than time taken
/// for faster connection acquires via [`Pool::acquire()`].
pub fn acquire_slow_level(mut self, level: LevelFilter) -> Self {
self.acquire_slow_level = level;
self
}

/// Set a threshold for reporting excessive time taken to acquire a connection from
/// the connection pool via [`Pool::acquire()`]. When the threshold is exceeded, a warning is logged.
///
/// Defaults to a value that should not typically be exceeded by the pool enlarging
/// itself with an additional new connection.
pub fn acquire_slow_threshold(mut self, threshold: Duration) -> Self {
self.acquire_slow_threshold = threshold;
self
}

/// Get the threshold for reporting excessive time taken to acquire a connection via
/// [`Pool::acquire()`].
pub fn get_acquire_slow_threshold(&self) -> Duration {
self.acquire_slow_threshold
}

/// Set the maximum amount of time to spend waiting for a connection in [`Pool::acquire()`].
///
/// Caps the total amount of time `Pool::acquire()` can spend waiting across multiple phases:
Expand Down Expand Up @@ -269,7 +316,7 @@ impl<DB: Database> PoolOptions<DB> {
self
}

/// Get's whether `test_before_acquire` is currently set.
/// Get whether `test_before_acquire` is currently set.
pub fn get_test_before_acquire(&self) -> bool {
self.test_before_acquire
}
Expand Down

0 comments on commit 49f3f45

Please sign in to comment.