Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit c580096

Browse files
committed
hrana: add diagnostics for connections
This commit adds a /v2/diagnostics endpoint which prints various information about current hrana-over-http connections. Draft, because the diagnostics are currently in a very debuggy format, and I'm figuring out if we can make it more human-readable. Still, they're enough to determine if something is holding a lock via an abandoned hrana-over-http stream. Example: ``` $ curl -s http://localhost:8080/v2/diagnostics | jq [ "Some(TxnSlot { conn: Mutex { data: <locked> }, timeout_at: Instant { tv_sec: 9751, tv_nsec: 546908348 }, is_stolen: false })" ] $ curl -s http://localhost:8080/v2/diagnostics | jq [ "expired" ]
1 parent 92bbc09 commit c580096

File tree

11 files changed

+105
-12
lines changed

11 files changed

+105
-12
lines changed

sqld-libsql-bindings/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub fn get_orig_wal_methods() -> anyhow::Result<*mut libsql_wal_methods> {
2424
Ok(orig)
2525
}
2626

27+
#[derive(Debug)]
2728
pub struct Connection<W: WalHook> {
2829
conn: rusqlite::Connection,
2930
// Safety: _ctx MUST be dropped after the connection, because the connection has a pointer

sqld-libsql-bindings/src/wal_hook.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ macro_rules! init_static_wal_method {
4646
///
4747
/// # Safety
4848
/// The implementer is responsible for calling the orig method with valid arguments.
49-
pub unsafe trait WalHook {
50-
type Context;
49+
pub unsafe trait WalHook: std::fmt::Debug {
50+
type Context: std::fmt::Debug;
5151

5252
fn name() -> &'static CStr;
5353
/// Intercept `xFrame` call. `orig` is the function pointer to the underlying wal method.
@@ -129,6 +129,7 @@ pub unsafe trait WalHook {
129129
init_static_wal_method!(TRANSPARENT_METHODS, TransparentMethods);
130130

131131
/// Wal implemementation that just proxies calls to the wrapped WAL methods implementation
132+
#[derive(Debug)]
132133
pub enum TransparentMethods {}
133134

134135
unsafe impl WalHook for TransparentMethods {

sqld/src/connection/libsql.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ where
144144
}
145145
}
146146

147-
#[derive(Clone)]
147+
#[derive(Clone, Debug)]
148148
pub struct LibSqlConnection<W: WalHook> {
149149
inner: Arc<Mutex<Connection<W>>>,
150150
}
@@ -206,6 +206,7 @@ where
206206
}
207207
}
208208

209+
#[derive(Debug)]
209210
struct Connection<W: WalHook = TransparentMethods> {
210211
conn: sqld_libsql_bindings::Connection<W>,
211212
stats: Arc<Stats>,
@@ -219,6 +220,7 @@ struct Connection<W: WalHook = TransparentMethods> {
219220
}
220221

221222
/// A slot for holding the state of a transaction lock permit
223+
#[derive(Debug)]
222224
struct TxnSlot<T: WalHook> {
223225
/// Pointer to the connection holding the lock. Used to rollback the transaction when the lock
224226
/// is stolen.
@@ -230,6 +232,7 @@ struct TxnSlot<T: WalHook> {
230232
}
231233

232234
/// The transaction state shared among all connections to the same database
235+
#[derive(Debug)]
233236
pub struct TxnState<T: WalHook> {
234237
/// Slot for the connection currently holding the transaction lock
235238
slot: RwLock<Option<Arc<TxnSlot<T>>>>,
@@ -711,6 +714,13 @@ where
711714
.unwrap()?;
712715
Ok(())
713716
}
717+
718+
fn diagnostics(&self) -> String {
719+
match self.inner.try_lock() {
720+
Some(conn) => format!("{:?}", conn.slot),
721+
None => "[BUG] connection busy".to_string(),
722+
}
723+
}
714724
}
715725

716726
#[cfg(test)]

sqld/src/connection/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub mod write_proxy;
2424
const TXN_TIMEOUT: Duration = Duration::from_secs(5);
2525

2626
#[async_trait::async_trait]
27-
pub trait Connection: Send + Sync + 'static {
27+
pub trait Connection: std::fmt::Debug + Send + Sync + 'static {
2828
/// Executes a query program
2929
async fn execute_program<B: QueryResultBuilder>(
3030
&self,
@@ -118,6 +118,8 @@ pub trait Connection: Send + Sync + 'static {
118118

119119
/// Calls for database checkpoint (if supported).
120120
async fn checkpoint(&self) -> Result<()>;
121+
122+
fn diagnostics(&self) -> String;
121123
}
122124

123125
fn make_batch_program(batch: Vec<Query>) -> Vec<Step> {
@@ -290,7 +292,7 @@ pub struct TrackedConnection<DB> {
290292
atime: AtomicU64,
291293
}
292294

293-
impl<DB> TrackedConnection<DB> {
295+
impl<DB: Connection> TrackedConnection<DB> {
294296
pub fn idle_time(&self) -> Duration {
295297
let now = now_millis();
296298
let atime = self.atime.load(Ordering::Relaxed);
@@ -335,12 +337,18 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
335337
self.atime.store(now_millis(), Ordering::Relaxed);
336338
self.inner.checkpoint().await
337339
}
340+
341+
#[inline]
342+
fn diagnostics(&self) -> String {
343+
self.inner.diagnostics()
344+
}
338345
}
339346

340347
#[cfg(test)]
341348
mod test {
342349
use super::*;
343350

351+
#[derive(Debug)]
344352
struct DummyDb;
345353

346354
#[async_trait::async_trait]
@@ -371,6 +379,10 @@ mod test {
371379
async fn checkpoint(&self) -> Result<()> {
372380
unreachable!()
373381
}
382+
383+
fn diagnostics(&self) -> String {
384+
"dummy".into()
385+
}
374386
}
375387

376388
#[tokio::test]

sqld/src/connection/write_proxy.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ impl MakeConnection for MakeWriteProxyConn {
104104
}
105105
}
106106

107+
#[derive(Debug)]
107108
pub struct WriteProxyConnection {
108109
/// Lazily initialized read connection
109110
read_conn: LibSqlConnection<TransparentMethods>,
@@ -316,6 +317,10 @@ impl Connection for WriteProxyConnection {
316317
self.wait_replication_sync(None).await?;
317318
self.read_conn.checkpoint().await
318319
}
320+
321+
fn diagnostics(&self) -> String {
322+
format!("{:?}", self.state)
323+
}
319324
}
320325

321326
impl Drop for WriteProxyConnection {

sqld/src/hrana/http/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub struct Server<C> {
2525
pub enum Endpoint {
2626
Pipeline,
2727
Cursor,
28+
Diagnostics,
2829
}
2930

3031
impl<C: Connection> Server<C> {
@@ -69,6 +70,10 @@ impl<C: Connection> Server<C> {
6970
})
7071
.or_else(|err| err.downcast::<ProtocolError>().map(protocol_error_response))
7172
}
73+
74+
pub(crate) fn stream_state(&self) -> &Mutex<stream::ServerStreamState<C>> {
75+
&self.stream_state
76+
}
7277
}
7378

7479
pub(crate) async fn handle_index() -> hyper::Response<hyper::Body> {
@@ -94,6 +99,7 @@ async fn handle_request<C: Connection>(
9499
Endpoint::Cursor => {
95100
handle_cursor(server, connection_maker, auth, req, version, encoding).await
96101
}
102+
Endpoint::Diagnostics => handle_diagnostics(server, auth).await,
97103
}
98104
}
99105

@@ -164,6 +170,34 @@ async fn handle_cursor<C: Connection>(
164170
.unwrap())
165171
}
166172

173+
async fn handle_diagnostics<C: Connection>(
174+
server: &Server<C>,
175+
_auth: Authenticated,
176+
) -> Result<hyper::Response<hyper::Body>> {
177+
let stream_state = server.stream_state().lock();
178+
let handles = stream_state.handles();
179+
let mut diagnostics: Vec<String> = Vec::with_capacity(handles.len());
180+
for (_, handle) in handles {
181+
let handle_info: String = match handle {
182+
stream::Handle::Available(stream) => match &stream.db {
183+
Some(db) => db.diagnostics(),
184+
None => "[BUG] available-but-closed".into(),
185+
},
186+
stream::Handle::Acquired => "acquired".into(),
187+
stream::Handle::Expired => "expired".into(),
188+
};
189+
diagnostics.push(handle_info);
190+
}
191+
drop(stream_state);
192+
193+
tracing::warn!("diagnostics test: {diagnostics:?}");
194+
Ok(hyper::Response::builder()
195+
.status(hyper::StatusCode::OK)
196+
.header(hyper::http::header::CONTENT_TYPE, "application/json")
197+
.body(serde_json::to_string(&diagnostics)?.into())
198+
.unwrap())
199+
}
200+
167201
struct CursorStream<D> {
168202
resp_body: Option<proto::CursorRespBody>,
169203
join_set: tokio::task::JoinSet<()>,

sqld/src/hrana/http/stream.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ pub struct ServerStreamState<D> {
3434
}
3535

3636
/// Handle to a stream, owned by the [`ServerStreamState`].
37-
enum Handle<D> {
37+
#[derive(Debug)]
38+
pub(crate) enum Handle<D> {
3839
/// A stream that is open and ready to be used by requests. [`Stream::db`] should always be
3940
/// `Some`.
4041
Available(Box<Stream<D>>),
@@ -51,10 +52,11 @@ enum Handle<D> {
5152
///
5253
/// The stream is either owned by [`Handle::Available`] (when it's not in use) or by [`Guard`]
5354
/// (when it's being used by a request).
54-
struct Stream<D> {
55+
#[derive(Debug)]
56+
pub(crate) struct Stream<D> {
5557
/// The database connection that corresponds to this stream. This is `None` after the `"close"`
5658
/// request was executed.
57-
db: Option<Arc<D>>,
59+
pub(crate) db: Option<Arc<D>>,
5860
/// The cache of SQL texts stored on the server with `"store_sql"` requests.
5961
sqls: HashMap<i32, String>,
6062
/// Stream id of this stream. The id is generated randomly (it should be unguessable).
@@ -98,6 +100,10 @@ impl<D> ServerStreamState<D> {
98100
expire_round_base: Instant::now(),
99101
}
100102
}
103+
104+
pub(crate) fn handles(&self) -> &HashMap<u64, Handle<D>> {
105+
&self.handles
106+
}
101107
}
102108

103109
/// Acquire a guard to a new or existing stream. If baton is `Some`, we try to look up the stream,

sqld/src/http/user/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,14 @@ where
340340
hrana::Encoding::Json,
341341
)),
342342
)
343+
.route(
344+
"/v2/diagnostics",
345+
get(handle_hrana!(
346+
hrana::http::Endpoint::Diagnostics,
347+
hrana::Version::Hrana2,
348+
hrana::Encoding::Json,
349+
)),
350+
)
343351
.route("/v3", get(crate::hrana::http::handle_index))
344352
.route(
345353
"/v3/pipeline",

sqld/src/replication/primary/logger.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use crate::libsql_bindings::ffi::SQLITE_IOERR_WRITE;
2020
use crate::libsql_bindings::ffi::{
2121
sqlite3,
2222
types::{XWalCheckpointFn, XWalFrameFn, XWalSavePointUndoFn, XWalUndoFn},
23-
PageHdrIter, PgHdr, Wal, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR, SQLITE_OK,
23+
PageHdrIter, PgHdr, Wal, SQLITE_CHECKPOINT_PASSIVE, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR,
24+
SQLITE_OK,
2425
};
2526
use crate::libsql_bindings::wal_hook::WalHook;
2627
use crate::replication::frame::{Frame, FrameHeader};
@@ -41,9 +42,10 @@ impl Version {
4142
}
4243
}
4344

45+
#[derive(Debug)]
4446
pub enum ReplicationLoggerHook {}
4547

46-
#[derive(Clone)]
48+
#[derive(Clone, Debug)]
4749
pub struct ReplicationLoggerHookCtx {
4850
buffer: Vec<WalPage>,
4951
logger: Arc<ReplicationLogger>,
@@ -199,6 +201,9 @@ unsafe impl WalHook for ReplicationLoggerHook {
199201
"Ignoring a checkpoint request weaker than TRUNCATE: {}",
200202
emode
201203
);
204+
if emode == SQLITE_CHECKPOINT_PASSIVE {
205+
return SQLITE_OK;
206+
}
202207
// Return an error to signal to sqlite that the WAL was not checkpointed, and it is
203208
// therefore not safe to delete it.
204209
return SQLITE_BUSY;
@@ -276,7 +281,7 @@ unsafe impl WalHook for ReplicationLoggerHook {
276281
}
277282
}
278283

279-
#[derive(Clone)]
284+
#[derive(Clone, Debug)]
280285
pub struct WalPage {
281286
pub page_no: u32,
282287
/// 0 for non-commit frames
@@ -731,6 +736,7 @@ impl LogFileHeader {
731736
}
732737
}
733738

739+
#[derive(Debug)]
734740
pub struct Generation {
735741
pub id: Uuid,
736742
pub start_index: u64,
@@ -745,6 +751,7 @@ impl Generation {
745751
}
746752
}
747753

754+
#[derive(Debug)]
748755
pub struct ReplicationLogger {
749756
pub generation: Generation,
750757
pub log_file: RwLock<LogFile>,

sqld/src/replication/replica/hook.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ init_static_wal_method!(INJECTOR_METHODS, InjectorHook);
7070
/// The Caller must first call `set_frames`, passing the frames to be injected, then trigger a call
7171
/// to xFrames from the libsql connection (see dummy write in `injector`), and can then collect the
7272
/// result on the injection with `take_result`
73+
#[derive(Debug)]
7374
pub enum InjectorHook {}
7475

7576
pub struct InjectorHookCtx {
@@ -83,6 +84,14 @@ pub struct InjectorHookCtx {
8384
post_commit: Box<dyn Fn(FrameNo) -> anyhow::Result<()>>,
8485
}
8586

87+
impl std::fmt::Debug for InjectorHookCtx {
88+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89+
f.debug_struct("InjectorHookCtx")
90+
.field("is_txn", &self.is_txn)
91+
.finish()
92+
}
93+
}
94+
8695
impl InjectorHookCtx {
8796
pub fn new(
8897
receiver: tokio::sync::mpsc::Receiver<Frames>,

sqld/src/replication/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl SnapshotFile {
163163
}
164164
}
165165

166-
#[derive(Clone)]
166+
#[derive(Clone, Debug)]
167167
pub struct LogCompactor {
168168
sender: crossbeam::channel::Sender<(LogFile, PathBuf, u32)>,
169169
}

0 commit comments

Comments
 (0)