Skip to content

Commit

Permalink
Merge pull request blackbeam#228 from cloneable/issue223-get_conn-and…
Browse files Browse the repository at this point in the history
…-conn-queries

Don't emit INFO spans for administrative queries
  • Loading branch information
blackbeam authored Feb 13, 2023
2 parents ca61d55 + 72dded0 commit 043d81a
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 17 deletions.
8 changes: 4 additions & 4 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,8 +887,8 @@ impl Conn {
/// Do nothing if socket address is already in [`Opts`] or if `prefer_socket` is `false`.
async fn read_socket(&mut self) -> Result<()> {
if self.inner.opts.prefer_socket() && self.inner.socket.is_none() {
let row_opt = self.query_first("SELECT @@socket").await?;
self.inner.socket = row_opt.unwrap_or((None,)).0;
let row_opt = self.query_internal("SELECT @@socket").await?;
self.inner.socket = row_opt.unwrap_or(None);
}
Ok(())
}
Expand All @@ -898,7 +898,7 @@ impl Conn {
let max_allowed_packet = if let Some(value) = self.opts().max_allowed_packet() {
Some(value)
} else {
self.query_first("SELECT @@max_allowed_packet").await?
self.query_internal("SELECT @@max_allowed_packet").await?
};
if let Some(stream) = self.inner.stream.as_mut() {
stream.set_max_allowed_packet(max_allowed_packet.unwrap_or(DEFAULT_MAX_ALLOWED_PACKET));
Expand All @@ -911,7 +911,7 @@ impl Conn {
let wait_timeout = if let Some(value) = self.opts().wait_timeout() {
Some(value)
} else {
self.query_first("SELECT @@wait_timeout").await?
self.query_internal("SELECT @@wait_timeout").await?
};
self.inner.wait_timeout = Duration::from_secs(wait_timeout.unwrap_or(28800) as u64);
Ok(())
Expand Down
25 changes: 17 additions & 8 deletions src/conn/routines/query.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,44 @@
use std::marker::PhantomData;

use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use mysql_common::constants::Command;
#[cfg(feature = "tracing")]
use tracing::{field, info_span, Instrument, Level};
use tracing::{field, span_enabled, Instrument, Level};

use crate::tracing_utils::TracingLevel;
use crate::{Conn, TextProtocol};

use super::Routine;

/// A routine that performs `COM_QUERY`.
#[derive(Debug, Copy, Clone)]
pub struct QueryRoutine<'a> {
pub struct QueryRoutine<'a, L: TracingLevel> {
data: &'a [u8],
_phantom: PhantomData<L>,
}

impl<'a> QueryRoutine<'a> {
impl<'a, L: TracingLevel> QueryRoutine<'a, L> {
pub fn new(data: &'a [u8]) -> Self {
Self { data }
Self {
data,
_phantom: PhantomData,
}
}
}

impl Routine<()> for QueryRoutine<'_> {
impl<L: TracingLevel> Routine<()> for QueryRoutine<'_, L> {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
#[cfg(feature = "tracing")]
let span = info_span!(
let span = create_span!(
L::LEVEL,
"mysql_async::query",
mysql_async.connection.id = conn.id(),
mysql_async.query.sql = field::Empty
mysql_async.query.sql = field::Empty,
);

#[cfg(feature = "tracing")]
if tracing::span_enabled!(Level::DEBUG) {
if span_enabled!(Level::DEBUG) {
// The statement may contain sensitive data. Restrict to DEBUG.
span.record(
"mysql_async.query.sql",
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ use std::sync::Arc;

mod buffer_pool;

#[macro_use]
mod tracing_utils;

#[macro_use]
mod macros;
mod conn;
Expand Down
3 changes: 2 additions & 1 deletion src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
connection_like::ToConnectionResult,
from_row,
prelude::{FromRow, StatementLike, ToConnection},
tracing_utils::LevelInfo,
BinaryProtocol, BoxFuture, Params, QueryResult, ResultSetStream, TextProtocol,
};

Expand Down Expand Up @@ -220,7 +221,7 @@ impl<Q: AsQuery> Query for Q {
ToConnectionResult::Immediate(conn) => conn,
ToConnectionResult::Mediate(fut) => fut.await?,
};
conn.raw_query(self).await?;
conn.raw_query::<'_, _, LevelInfo>(self).await?;
Ok(QueryResult::new(conn))
}
.boxed()
Expand Down
27 changes: 23 additions & 4 deletions src/queryable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
prelude::{FromRow, StatementLike},
query::AsQuery,
queryable::query_result::ResultSetMeta,
tracing_utils::{LevelInfo, LevelTrace, TracingLevel},
BoxFuture, Column, Conn, Params, ResultSetStream, Row,
};

Expand Down Expand Up @@ -102,13 +103,32 @@ impl Conn {
}

/// Low level function that performs a text query.
pub(crate) async fn raw_query<'a, Q>(&'a mut self, query: Q) -> Result<()>
pub(crate) async fn raw_query<'a, Q, L: TracingLevel>(&'a mut self, query: Q) -> Result<()>
where
Q: AsQuery + 'a,
{
self.routine(QueryRoutine::new(query.as_query().as_ref()))
self.routine(QueryRoutine::<'_, L>::new(query.as_query().as_ref()))
.await
}

/// Used for internal querying of connection settings,
/// bypassing instrumentation meant for user queries.
// This is a merge of `Queryable::query_first` and `Conn::query_iter`.
// TODO: find a cleaner way without duplicating code.
pub(crate) fn query_internal<'a, T, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Option<T>>
where
Q: AsQuery + 'a,
T: FromRow + Send + 'static,
{
async move {
self.raw_query::<'_, _, LevelTrace>(query).await?;
Ok(QueryResult::<'_, '_, TextProtocol>::new(self)
.collect_and_drop::<T>()
.await?
.pop())
}
.boxed()
}
}

/// Methods of this trait are used to execute database queries.
Expand Down Expand Up @@ -456,8 +476,7 @@ impl Queryable for Conn {
Q: AsQuery + 'a,
{
async move {
self.routine(QueryRoutine::new(query.as_query().as_ref()))
.await?;
self.raw_query::<'_, _, LevelInfo>(query).await?;
Ok(QueryResult::new(self))
}
.boxed()
Expand Down
40 changes: 40 additions & 0 deletions src/tracing_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/// Compile-time tracing level.
pub trait TracingLevel: Send + Sync + 'static {
#[cfg(feature = "tracing")]
const LEVEL: tracing::Level;
}

/// INFO tracing level.
pub struct LevelInfo;

impl TracingLevel for LevelInfo {
#[cfg(feature = "tracing")]
const LEVEL: tracing::Level = tracing::Level::INFO;
}

/// TRACE tracing level.
pub struct LevelTrace;

impl TracingLevel for LevelTrace {
#[cfg(feature = "tracing")]
const LEVEL: tracing::Level = tracing::Level::TRACE;
}

#[cfg(feature = "tracing")]
macro_rules! create_span {
($s:expr, $($field:tt)*) => {
if $s == tracing::Level::TRACE {
tracing::trace_span!($($field)*)
} else if $s == tracing::Level::DEBUG {
tracing::debug_span!($($field)*)
} else if $s == tracing::Level::INFO {
tracing::info_span!($($field)*)
} else if $s == tracing::Level::WARN {
tracing::warn_span!($($field)*)
} else if $s == tracing::Level::ERROR {
tracing::error_span!($($field)*)
} else {
unreachable!();
}
}
}

0 comments on commit 043d81a

Please sign in to comment.