Skip to content

Commit 7e2e3a2

Browse files
committed
add tracing to connect
1 parent 124ed8e commit 7e2e3a2

File tree

8 files changed

+138
-43
lines changed

8 files changed

+138
-43
lines changed

tokio-postgres/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::simple_query::SimpleQueryStream;
99
#[cfg(feature = "runtime")]
1010
use crate::tls::MakeTlsConnect;
1111
use crate::tls::TlsConnect;
12-
use crate::trace::{make_span, SpanOperation};
12+
use crate::trace::{make_span_for_client, SpanOperation};
1313
use crate::types::{Oid, ToSql, Type};
1414
#[cfg(feature = "runtime")]
1515
use crate::Socket;
@@ -258,7 +258,7 @@ impl Client {
258258
query: &str,
259259
parameter_types: &[Type],
260260
) -> Result<Statement, Error> {
261-
let span = make_span(&self.inner, SpanOperation::Prepare);
261+
let span = make_span_for_client(&self.inner, SpanOperation::Prepare);
262262

263263
prepare::prepare(&self.inner, query, parameter_types)
264264
.instrument(span)

tokio-postgres/src/connect.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ use crate::config::{Host, LoadBalanceHosts, TargetSessionAttrs};
33
use crate::connect_raw::connect_raw;
44
use crate::connect_socket::connect_socket;
55
use crate::tls::MakeTlsConnect;
6+
use crate::trace::{make_span, record_connect_info, record_error, record_ok, SpanOperation};
67
use crate::{Client, Config, Connection, Error, SimpleQueryMessage, Socket};
78
use futures_util::{future, pin_mut, Future, FutureExt, Stream};
89
use rand::seq::SliceRandom;
910
use std::task::Poll;
11+
use std::time::Instant;
1012
use std::{cmp, io};
1113
use tokio::net;
14+
use tracing::Instrument as _;
1215

1316
pub async fn connect<T>(
1417
mut tls: T,
@@ -93,24 +96,51 @@ async fn connect_host<T>(
9396
where
9497
T: MakeTlsConnect<Socket>,
9598
{
99+
let mut span = make_span(
100+
SpanOperation::Connect,
101+
config.user.as_deref().unwrap_or_default(),
102+
config.dbname.as_deref().unwrap_or_default(),
103+
);
96104
match host {
97105
Host::Tcp(host) => {
106+
let dns_start = Instant::now();
98107
let mut addrs = net::lookup_host((&*host, port))
108+
.instrument(span.clone())
99109
.await
100110
.map_err(Error::connect)?
101111
.collect::<Vec<_>>();
112+
span.record(
113+
"db.connect.timing.dns_lookup_ns",
114+
dns_start.elapsed().as_nanos() as u64,
115+
);
102116

103117
if config.load_balance_hosts == LoadBalanceHosts::Random {
104118
addrs.shuffle(&mut rand::thread_rng());
105119
}
106120

107121
let mut last_err = None;
108-
for addr in addrs {
109-
match connect_once(Addr::Tcp(addr.ip()), hostname.as_deref(), port, tls, config)
122+
for (i, addr) in addrs.iter().enumerate() {
123+
if i > 0 {
124+
span = make_span(
125+
SpanOperation::Connect,
126+
config.user.as_deref().unwrap_or_default(),
127+
config.dbname.as_deref().unwrap_or_default(),
128+
);
129+
}
130+
131+
let addr = Addr::Tcp(addr.ip());
132+
span.record("db.connect.attempt", i);
133+
record_connect_info(&span, &addr, hostname.as_deref(), port);
134+
match connect_once(addr, hostname.as_deref(), port, tls, config)
135+
.instrument(span.clone())
110136
.await
111137
{
112-
Ok(stream) => return Ok(stream),
138+
Ok((client, conn)) => {
139+
record_ok(&span);
140+
return Ok((client, conn));
141+
}
113142
Err(e) => {
143+
record_error(&span, &e);
114144
last_err = Some(e);
115145
continue;
116146
}
@@ -126,7 +156,21 @@ where
126156
}
127157
#[cfg(unix)]
128158
Host::Unix(path) => {
129-
connect_once(Addr::Unix(path), hostname.as_deref(), port, tls, config).await
159+
let addr = Addr::Unix(path);
160+
record_connect_info(&span, &addr, hostname.as_deref(), port);
161+
match connect_once(addr, hostname.as_deref(), port, tls, config)
162+
.instrument(span.clone())
163+
.await
164+
{
165+
Ok((client, conn)) => {
166+
record_ok(&span);
167+
Ok((client, conn))
168+
}
169+
Err(e) => {
170+
record_error(&span, &e);
171+
Err(e)
172+
}
173+
}
130174
}
131175
}
132176
}

tokio-postgres/src/connect_raw.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ use std::collections::{HashMap, VecDeque};
1717
use std::io;
1818
use std::pin::Pin;
1919
use std::task::{Context, Poll};
20+
use std::time::Instant;
2021
use tokio::io::{AsyncRead, AsyncWrite};
2122
use tokio_util::codec::Framed;
23+
use tracing::Span;
2224

2325
pub struct StartupStream<S, T> {
2426
inner: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
@@ -96,9 +98,14 @@ where
9698
delayed: VecDeque::new(),
9799
};
98100

101+
let start_time = Instant::now();
99102
startup(&mut stream, config).await?;
100103
authenticate(&mut stream, config).await?;
101104
let (process_id, secret_key, parameters) = read_info(&mut stream).await?;
105+
Span::current().record(
106+
"db.connect.timing.auth_ns",
107+
start_time.elapsed().as_nanos() as u64,
108+
);
102109

103110
let (sender, receiver) = mpsc::unbounded();
104111
let client = Client::new(sender, config.ssl_mode, process_id, secret_key, config);

tokio-postgres/src/connect_socket.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ use crate::{Error, Socket};
44
use socket2::{SockRef, TcpKeepalive};
55
use std::future::Future;
66
use std::io;
7-
use std::time::Duration;
7+
use std::time::{Duration, Instant};
88
use tokio::net::TcpStream;
99
#[cfg(unix)]
1010
use tokio::net::UnixStream;
1111
use tokio::time;
12+
use tracing::Span;
1213

1314
pub(crate) async fn connect_socket(
1415
addr: &Addr,
@@ -21,8 +22,13 @@ pub(crate) async fn connect_socket(
2122
) -> Result<Socket, Error> {
2223
match addr {
2324
Addr::Tcp(ip) => {
25+
let connect_start = Instant::now();
2426
let stream =
2527
connect_with_timeout(TcpStream::connect((*ip, port)), connect_timeout).await?;
28+
Span::current().record(
29+
"db.connect.timing.tcp_handshake_ns",
30+
connect_start.elapsed().as_nanos() as u64,
31+
);
2632

2733
stream.set_nodelay(true).map_err(Error::connect)?;
2834

tokio-postgres/src/connect_tls.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use crate::tls::TlsConnect;
55
use crate::Error;
66
use bytes::BytesMut;
77
use postgres_protocol::message::frontend;
8+
use std::time::Instant;
89
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
10+
use tracing::Span;
911

1012
pub async fn connect_tls<S, T>(
1113
mut stream: S,
@@ -25,6 +27,7 @@ where
2527
SslMode::Prefer | SslMode::Require => {}
2628
}
2729

30+
let start_time = Instant::now();
2831
let mut buf = BytesMut::new();
2932
frontend::ssl_request(&mut buf);
3033
stream.write_all(&buf).await.map_err(Error::io)?;
@@ -48,6 +51,10 @@ where
4851
.connect(stream)
4952
.await
5053
.map_err(|e| Error::tls(e.into()))?;
54+
Span::current().record(
55+
"db.connect.timing.tls_handshake_ns",
56+
start_time.elapsed().as_nanos() as u64,
57+
);
5158

5259
Ok(MaybeTlsStream::Tls(stream))
5360
}

tokio-postgres/src/prepare.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::client::InnerClient;
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
44
use crate::error::SqlState;
5-
use crate::trace::{make_span, SpanOperation};
5+
use crate::trace::{make_span_for_client, SpanOperation};
66
use crate::types::{Field, Kind, Oid, Type};
77
use crate::{query, slice_iter};
88
use crate::{Column, Error, Statement};
@@ -116,7 +116,7 @@ fn prepare_rec<'a>(
116116
query: &'a str,
117117
types: &'a [Type],
118118
) -> Pin<Box<dyn Future<Output = Result<Statement, Error>> + 'a + Send>> {
119-
let span = make_span(client, SpanOperation::Prepare);
119+
let span = make_span_for_client(client, SpanOperation::Prepare);
120120
Box::pin(prepare(client, query, types).instrument(span))
121121
}
122122

tokio-postgres/src/query.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4-
use crate::trace::{make_span, SpanOperation};
4+
use crate::trace::{make_span_for_client, record_error, SpanOperation, record_ok};
55
use crate::types::{BorrowToSql, IsNull};
66
use crate::{Error, Portal, Row, Statement};
77
use bytes::{Bytes, BytesMut};
@@ -63,12 +63,11 @@ where
6363
async fn start_traced(client: &InnerClient, span: &Span, buf: Bytes) -> Result<Responses, Error> {
6464
match start(client, buf).instrument(span.clone()).await {
6565
Ok(response) => {
66-
span.record("otel.status_code", "OK");
66+
record_ok(span);
6767
Ok(response)
6868
}
6969
Err(e) => {
70-
span.record("otel.status_code", "ERROR");
71-
span.record("exception.message", e.to_string());
70+
record_error(span, &e);
7271
Err(e)
7372
}
7473
}
@@ -84,8 +83,7 @@ where
8483
I: IntoIterator<Item = P>,
8584
I::IntoIter: ExactSizeIterator,
8685
{
87-
let span = make_span(client, SpanOperation::Query);
88-
span.record("db.operation", "query");
86+
let span = make_span_for_client(client, SpanOperation::Query);
8987

9088
let buf = encode_with_logs(client, &span, &statement, params)?;
9189

@@ -108,7 +106,7 @@ pub async fn query_portal(
108106
portal: &Portal,
109107
max_rows: i32,
110108
) -> Result<RowStream, Error> {
111-
let span = make_span(client, SpanOperation::Portal);
109+
let span = make_span_for_client(client, SpanOperation::Portal);
112110
span.record("db.statement", portal.statement().query());
113111

114112
let buf = client.with_buf(|buf| {
@@ -151,7 +149,7 @@ where
151149
I: IntoIterator<Item = P>,
152150
I::IntoIter: ExactSizeIterator,
153151
{
154-
let span = make_span(client, SpanOperation::Execute);
152+
let span = make_span_for_client(client, SpanOperation::Execute);
155153

156154
let buf = encode_with_logs(client, &span, &statement, params)?;
157155

tokio-postgres/src/trace.rs

Lines changed: 59 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
use tracing::{field::Empty, Level, Span};
22

3-
use crate::client::{Addr, InnerClient};
3+
use crate::{
4+
client::{Addr, InnerClient, SocketConfig},
5+
Error,
6+
};
47

58
pub(crate) enum SpanOperation {
9+
Connect,
610
Query,
711
Portal,
812
Execute,
@@ -12,6 +16,7 @@ pub(crate) enum SpanOperation {
1216
impl SpanOperation {
1317
const fn name(&self) -> &'static str {
1418
match self {
19+
SpanOperation::Connect => "connect",
1520
SpanOperation::Query => "query",
1621
SpanOperation::Portal => "portal",
1722
SpanOperation::Execute => "execute",
@@ -21,6 +26,7 @@ impl SpanOperation {
2126

2227
const fn level(&self) -> Level {
2328
match self {
29+
SpanOperation::Connect => Level::DEBUG,
2430
SpanOperation::Query => Level::DEBUG,
2531
SpanOperation::Portal => Level::DEBUG,
2632
SpanOperation::Execute => Level::DEBUG,
@@ -76,50 +82,77 @@ macro_rules! span_dynamic_lvl {
7682
};
7783
}
7884

79-
pub(crate) fn make_span(client: &InnerClient, operation: SpanOperation) -> Span {
80-
let span = span_dynamic_lvl!(operation.level(), "query",
85+
pub(crate) fn make_span_for_client(client: &InnerClient, operation: SpanOperation) -> Span {
86+
let span = make_span(operation, client.db_user(), client.db_name());
87+
record_socket_config(&span, client.socket_config());
88+
span
89+
}
90+
91+
pub(crate) fn make_span(operation: SpanOperation, db_user: &str, db_name: &str) -> Span {
92+
span_dynamic_lvl!(
93+
operation.level(),
94+
"query",
8195
db.system = "postgresql",
8296
server.address = Empty,
8397
server.socket.address = Empty,
8498
server.port = Empty,
8599
"network.type" = Empty,
86100
network.transport = Empty,
87-
db.name = %client.db_name(),
88-
db.user = %client.db_user(),
89-
otel.name = %format!("PSQL {} {}", operation.name(), client.db_name()),
101+
db.name = db_name,
102+
db.user = db_user,
103+
otel.name = format!("PSQL {} {}", operation.name(), db_name),
90104
otel.kind = "Client",
91-
92105
// to set when output
93106
otel.status_code = Empty,
94107
exception.message = Empty,
95108
db.operation = operation.name(),
96-
97109
// for queries
98110
db.statement = Empty,
99111
db.statement.params = Empty,
100112
db.sql.rows_affected = Empty,
101-
);
113+
// for connections
114+
db.connect.attempt = Empty,
115+
db.connect.timing.dns_lookup_ns = Empty,
116+
db.connect.timing.tcp_handshake_ns = Empty,
117+
db.connect.timing.tls_handshake_ns = Empty,
118+
db.connect.timing.auth_ns = Empty,
119+
)
120+
}
121+
122+
pub(crate) fn record_socket_config(span: &Span, socket_config: Option<&SocketConfig>) {
123+
if let Some(s) = socket_config {
124+
record_connect_info(span, &s.addr, s.hostname.as_deref(), s.port);
125+
}
126+
}
127+
128+
pub(crate) fn record_connect_info(span: &Span, addr: &Addr, hostname: Option<&str>, port: u16) {
102129
// only executes if span passed the filter
103130
span.in_scope(|| {
104-
if let Some(socket_config) = client.socket_config() {
105-
if let Some(hostname) = socket_config.hostname.as_deref() {
106-
span.record("server.address", hostname);
131+
if let Some(hostname) = hostname {
132+
span.record("server.address", hostname);
133+
}
134+
match addr {
135+
Addr::Tcp(addr) => {
136+
span.record("server.socket.address", addr.to_string());
137+
let network_type = if addr.is_ipv4() { "ipv4" } else { "ipv6" };
138+
span.record("network.type", network_type);
139+
span.record("network.transport", "tcp");
107140
}
108-
match &socket_config.addr {
109-
Addr::Tcp(addr) => {
110-
span.record("server.socket.address", addr.to_string());
111-
let network_type = if addr.is_ipv4() { "ipv4" } else { "ipv6" };
112-
span.record("network.type", network_type);
113-
span.record("network.transport", "tcp");
114-
}
115-
#[cfg(unix)]
116-
Addr::Unix(path) => {
117-
span.record("server.socket.address", path.to_string_lossy().into_owned());
118-
span.record("network.type", "unix");
119-
}
141+
#[cfg(unix)]
142+
Addr::Unix(path) => {
143+
span.record("server.socket.address", path.to_string_lossy().into_owned());
144+
span.record("network.type", "unix");
120145
}
121-
span.record("server.port", socket_config.port);
122146
}
147+
span.record("server.port", port);
123148
});
124-
span
149+
}
150+
151+
pub(crate) fn record_error(span: &Span, e: &Error) {
152+
span.record("otel.status_code", "ERROR")
153+
.record("exception.message", tracing::field::display(e));
154+
}
155+
156+
pub(crate) fn record_ok(span: &Span) {
157+
span.record("otel.status_code", "OK");
125158
}

0 commit comments

Comments
 (0)