Skip to content

Commit

Permalink
feat(logging): Log acquires from connection pool
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Potts <8704475+iamjpotts@users.noreply.github.com>
  • Loading branch information
iamjpotts committed Mar 7, 2024
1 parent ac42f63 commit e3dd0b8
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 12 deletions.
6 changes: 6 additions & 0 deletions sqlx-core/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ pub fn private_level_filter_to_levels(
tracing_level.zip(filter.to_level())
}

pub(crate) fn private_level_filter_to_trace_level(
filter: log::LevelFilter,
) -> Option<tracing::Level> {
private_level_filter_to_levels(filter).map(|(level, _)| level)
}

pub use sqlformat;

pub struct QueryLogger<'q> {
Expand Down
42 changes: 35 additions & 7 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::task::Poll;

use crate::logger::private_level_filter_to_trace_level;
use crate::pool::options::PoolConnectionMetadata;
use crate::private_tracing_dynamic_event;
use futures_util::future::{self};
use futures_util::FutureExt;
use std::time::{Duration, Instant};
use tracing::Level;

pub(crate) struct PoolInner<DB: Database> {
pub(super) connect_options: RwLock<Arc<<DB::Connection as Connection>::Options>>,
Expand All @@ -28,6 +31,8 @@ pub(crate) struct PoolInner<DB: Database> {
is_closed: AtomicBool,
pub(super) on_closed: event_listener::Event,
pub(super) options: PoolOptions<DB>,
pub(crate) acquire_time_level: Option<Level>,
pub(crate) acquire_slow_level: Option<Level>,
}

impl<DB: Database> PoolInner<DB> {
Expand All @@ -54,6 +59,8 @@ impl<DB: Database> PoolInner<DB> {
num_idle: AtomicUsize::new(0),
is_closed: AtomicBool::new(false),
on_closed: event_listener::Event::new(),
acquire_time_level: private_level_filter_to_trace_level(options.acquire_time_level),
acquire_slow_level: private_level_filter_to_trace_level(options.acquire_slow_level),
options,
};

Expand Down Expand Up @@ -289,16 +296,37 @@ impl<DB: Database> PoolInner<DB> {
.await
.map_err(|_| Error::PoolTimedOut)??;

let acquired_at = Instant::now();
let acquired_after = acquired_at - acquire_started_at;
let acquired_after = Instant::now() - acquire_started_at;

let acquire_time_level = match self.acquire_slow_level {
Some(level) => {
if acquired_after >= self.options.acquire_slow_after {
private_tracing_dynamic_event!(
target: "sqlx::pool",
level,
aquired_after_secs = acquired_after.as_secs_f64(),
slow_acquire_threshold_secs = self.options.acquire_slow_after.as_secs_f64(),
"acquired connection, but time to acquire exceeded slow threshold"
);

// Suppress non-slow acquire logging
None
} else {
// Fallback on non-slow logging, if configured
self.acquire_time_level
}
}
// Slow logging not configured
None => self.acquire_time_level,
};

if acquired_after >= self.options.acquire_slow_after {
tracing::warn!(
if let Some(level) = acquire_time_level {
private_tracing_dynamic_event!(
target: "sqlx::pool",
level,
aquired_after_secs = acquired_after.as_secs_f64(),
slow_acquire_threshold_secs = self.options.acquire_slow_after.as_secs_f64(),
"slow connection acquire: time exceeded alert threshold"
)
"acquired connection"
);
}

Ok(acquired)
Expand Down
31 changes: 26 additions & 5 deletions sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::error::Error;
use crate::pool::inner::PoolInner;
use crate::pool::Pool;
use futures_core::future::BoxFuture;
use log::LevelFilter;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -74,7 +75,9 @@ pub struct PoolOptions<DB: Database> {
>,
>,
pub(crate) max_connections: u32,
pub(crate) acquire_time_level: LevelFilter,
pub(crate) acquire_slow_after: Duration,
pub(crate) acquire_slow_level: LevelFilter,
pub(crate) acquire_timeout: Duration,
pub(crate) min_connections: u32,
pub(crate) max_lifetime: Option<Duration>,
Expand All @@ -95,7 +98,9 @@ impl<DB: Database> Clone for PoolOptions<DB> {
before_acquire: self.before_acquire.clone(),
after_release: self.after_release.clone(),
max_connections: self.max_connections,
acquire_time_level: self.acquire_time_level,
acquire_slow_after: self.acquire_slow_after,
acquire_slow_level: self.acquire_slow_level,
acquire_timeout: self.acquire_timeout,
min_connections: self.min_connections,
max_lifetime: self.max_lifetime,
Expand Down Expand Up @@ -145,8 +150,13 @@ impl<DB: Database> PoolOptions<DB> {
// A production application will want to set a higher limit than this.
max_connections: 10,
min_connections: 0,
// Fast enough to catch problems (e.g. a full pool); slow enough to not flag typical time to add a new connection to a pool
// Logging all acquires is opt-in
acquire_time_level: LevelFilter::Off,
// Fast enough to catch problems (e.g. a full pool); slow enough
// to not flag typical time to add a new connection to a pool.
acquire_slow_after: Duration::from_secs(3),
// Default to warning, because an acquire timeout will be an error
acquire_slow_level: LevelFilter::Warn,
acquire_timeout: Duration::from_secs(30),
idle_timeout: Some(Duration::from_secs(10 * 60)),
max_lifetime: Some(Duration::from_secs(30 * 60)),
Expand Down Expand Up @@ -202,17 +212,28 @@ impl<DB: Database> PoolOptions<DB> {
self.min_connections
}

/// Enable logging of time taken to acquire a connection from the connection pool via
/// [`Pool::acquire()`].
///
/// If slow acquire logging is also enabled, this level is used for acquires that are not
/// considered slow.
pub fn log_acquires(mut self, level: LevelFilter) -> Self {
self.acquire_time_level = level;
self
}

/// Set a threshold for reporting excessive time taken to acquire a connection from
/// the connection pool. When the threshold is exceeded, a warning is logged.
/// the connection pool via [`Pool::acquire()`]. When the threshold is exceeded, a warning is logged.
///
/// Defaults to a value that should not typically be exceeded by the pool enlarging
/// itself with an additional new connection.
pub fn acquire_slow_after(mut self, value: Duration) -> Self {
self.acquire_slow_after = value;
pub fn slow_acquire_threshold(mut self, threshold: Duration) -> Self {
self.acquire_slow_after = threshold;
self
}

/// Get the threshold for reporting excessive time taken to acquire a connection.
/// Get the threshold for reporting excessive time taken to acquire a connection via
/// [`Pool::acquire()`].
pub fn get_acquire_slow_after(&self) -> Duration {
self.acquire_slow_after
}
Expand Down

0 comments on commit e3dd0b8

Please sign in to comment.