Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

hrana: add diagnostics for connections #729

Merged
merged 6 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sqld-libsql-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub fn get_orig_wal_methods() -> anyhow::Result<*mut libsql_wal_methods> {
Ok(orig)
}

#[derive(Debug)]
pub struct Connection<W: WalHook> {
conn: rusqlite::Connection,
// Safety: _ctx MUST be dropped after the connection, because the connection has a pointer
Expand Down
1 change: 1 addition & 0 deletions sqld-libsql-bindings/src/wal_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub unsafe trait WalHook {
init_static_wal_method!(TRANSPARENT_METHODS, TransparentMethods);

/// Wal implemementation that just proxies calls to the wrapped WAL methods implementation
#[derive(Debug)]
pub enum TransparentMethods {}

unsafe impl WalHook for TransparentMethods {
Expand Down
45 changes: 45 additions & 0 deletions sqld/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ pub struct LibSqlConnection<W: WalHook> {
inner: Arc<Mutex<Connection<W>>>,
}

impl<W: WalHook> std::fmt::Debug for LibSqlConnection<W> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.inner.try_lock() {
Some(conn) => {
write!(f, "{conn:?}")
}
None => write!(f, "<locked>"),
}
}
}

pub fn open_conn<W>(
path: &Path,
wal_methods: &'static WalMethodsHook<W>,
Expand Down Expand Up @@ -218,6 +229,14 @@ struct Connection<W: WalHook = TransparentMethods> {
slot: Option<Arc<TxnSlot<W>>>,
}

impl<W: WalHook> std::fmt::Debug for Connection<W> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection")
.field("slot", &self.slot)
.finish()
}
}

/// A slot for holding the state of a transaction lock permit
struct TxnSlot<T: WalHook> {
/// Pointer to the connection holding the lock. Used to rollback the transaction when the lock
Expand All @@ -229,7 +248,23 @@ struct TxnSlot<T: WalHook> {
is_stolen: AtomicBool,
}

impl<T: WalHook> std::fmt::Debug for TxnSlot<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let stolen = self.is_stolen.load(Ordering::Relaxed);
let time_left = self
.timeout_at
.duration_since(tokio::time::Instant::now())
.as_millis();
write!(
f,
"(conn: {:?}, timeout_ms: {time_left}, stolen: {stolen})",
self.conn
)
}
}

/// The transaction state shared among all connections to the same database
#[derive(Debug)]
pub struct TxnState<T: WalHook> {
/// Slot for the connection currently holding the transaction lock
slot: RwLock<Option<Arc<TxnSlot<T>>>>,
Expand Down Expand Up @@ -711,6 +746,16 @@ where
.unwrap()?;
Ok(())
}

fn diagnostics(&self) -> String {
match self.inner.try_lock() {
Some(conn) => match conn.slot {
Some(ref slot) => format!("{slot:?}"),
None => "<no-transaction>".to_string(),
},
None => "[BUG] connection busy".to_string(),
}
}
}

#[cfg(test)]
Expand Down
14 changes: 13 additions & 1 deletion sqld/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ pub trait Connection: Send + Sync + 'static {

/// Calls for database checkpoint (if supported).
async fn checkpoint(&self) -> Result<()>;

fn diagnostics(&self) -> String;
}

fn make_batch_program(batch: Vec<Query>) -> Vec<Step> {
Expand Down Expand Up @@ -290,7 +292,7 @@ pub struct TrackedConnection<DB> {
atime: AtomicU64,
}

impl<DB> TrackedConnection<DB> {
impl<DB: Connection> TrackedConnection<DB> {
pub fn idle_time(&self) -> Duration {
let now = now_millis();
let atime = self.atime.load(Ordering::Relaxed);
Expand Down Expand Up @@ -335,12 +337,18 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
self.atime.store(now_millis(), Ordering::Relaxed);
self.inner.checkpoint().await
}

#[inline]
fn diagnostics(&self) -> String {
self.inner.diagnostics()
}
}

#[cfg(test)]
mod test {
use super::*;

#[derive(Debug)]
struct DummyDb;

#[async_trait::async_trait]
Expand Down Expand Up @@ -371,6 +379,10 @@ mod test {
async fn checkpoint(&self) -> Result<()> {
unreachable!()
}

fn diagnostics(&self) -> String {
"dummy".into()
}
}

#[tokio::test]
Expand Down
5 changes: 5 additions & 0 deletions sqld/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl MakeConnection for MakeWriteProxyConn {
}
}

#[derive(Debug)]
pub struct WriteProxyConnection {
/// Lazily initialized read connection
read_conn: LibSqlConnection<TransparentMethods>,
Expand Down Expand Up @@ -316,6 +317,10 @@ impl Connection for WriteProxyConnection {
self.wait_replication_sync(None).await?;
self.read_conn.checkpoint().await
}

fn diagnostics(&self) -> String {
format!("{:?}", self.state)
}
}

impl Drop for WriteProxyConnection {
Expand Down
6 changes: 5 additions & 1 deletion sqld/src/hrana/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::connection::{Connection, MakeConnection};
mod proto;
mod protobuf;
mod request;
mod stream;
pub(crate) mod stream;

pub struct Server<C> {
self_url: Option<String>,
Expand Down Expand Up @@ -69,6 +69,10 @@ impl<C: Connection> Server<C> {
})
.or_else(|err| err.downcast::<ProtocolError>().map(protocol_error_response))
}

pub(crate) fn stream_state(&self) -> &Mutex<stream::ServerStreamState<C>> {
&self.stream_state
}
}

pub(crate) async fn handle_index() -> hyper::Response<hyper::Body> {
Expand Down
12 changes: 9 additions & 3 deletions sqld/src/hrana/http/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ pub struct ServerStreamState<D> {
}

/// Handle to a stream, owned by the [`ServerStreamState`].
enum Handle<D> {
#[derive(Debug)]
pub(crate) enum Handle<D> {
/// A stream that is open and ready to be used by requests. [`Stream::db`] should always be
/// `Some`.
Available(Box<Stream<D>>),
Expand All @@ -51,10 +52,11 @@ enum Handle<D> {
///
/// The stream is either owned by [`Handle::Available`] (when it's not in use) or by [`Guard`]
/// (when it's being used by a request).
struct Stream<D> {
#[derive(Debug)]
pub(crate) struct Stream<D> {
/// The database connection that corresponds to this stream. This is `None` after the `"close"`
/// request was executed.
db: Option<Arc<D>>,
pub(crate) db: Option<Arc<D>>,
/// The cache of SQL texts stored on the server with `"store_sql"` requests.
sqls: HashMap<i32, String>,
/// Stream id of this stream. The id is generated randomly (it should be unguessable).
Expand Down Expand Up @@ -98,6 +100,10 @@ impl<D> ServerStreamState<D> {
expire_round_base: Instant::now(),
}
}

pub(crate) fn handles(&self) -> &HashMap<u64, Handle<D>> {
&self.handles
}
}

/// Acquire a guard to a new or existing stream. If baton is `Some`, we try to look up the stream,
Expand Down
45 changes: 43 additions & 2 deletions sqld/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,26 @@ use tokio_util::io::ReaderStream;
use url::Url;

use crate::connection::config::DatabaseConfig;
use crate::database::Database;
use crate::error::LoadDumpError;
use crate::hrana;
use crate::namespace::{DumpStream, MakeNamespace, NamespaceName, NamespaceStore, RestoreOption};

pub mod stats;

type UserHttpServer<M> =
Arc<hrana::http::Server<<<M as MakeNamespace>::Database as Database>::Connection>>;

struct AppState<M: MakeNamespace> {
namespaces: NamespaceStore<M>,
user_http_server: UserHttpServer<M>,
}

pub async fn run<M, A>(acceptor: A, namespaces: NamespaceStore<M>) -> anyhow::Result<()>
pub async fn run<M, A>(
acceptor: A,
user_http_server: UserHttpServer<M>,
namespaces: NamespaceStore<M>,
) -> anyhow::Result<()>
where
A: crate::net::Accept,
M: MakeNamespace,
Expand All @@ -43,7 +53,11 @@ where
)
.route("/v1/namespaces/:namespace", delete(handle_delete_namespace))
.route("/v1/namespaces/:namespace/stats", get(stats::handle_stats))
.with_state(Arc::new(AppState { namespaces }));
.route("/v1/diagnostics", get(handle_diagnostics))
.with_state(Arc::new(AppState {
namespaces,
user_http_server,
}));

hyper::server::Server::builder(acceptor)
.serve(router.into_make_service())
Expand All @@ -67,6 +81,33 @@ async fn handle_get_config<M: MakeNamespace>(
Ok(Json(store.get()))
}

async fn handle_diagnostics<M: MakeNamespace>(
State(app_state): State<Arc<AppState<M>>>,
) -> crate::Result<Json<Vec<String>>> {
use crate::connection::Connection;
use hrana::http::stream;

let server = app_state.user_http_server.as_ref();
let stream_state = server.stream_state().lock();
let handles = stream_state.handles();
let mut diagnostics: Vec<String> = Vec::with_capacity(handles.len());
for handle in handles.values() {
let handle_info: String = match handle {
stream::Handle::Available(stream) => match &stream.db {
Some(db) => db.diagnostics(),
None => "[BUG] available-but-closed".into(),
},
stream::Handle::Acquired => "acquired".into(),
stream::Handle::Expired => "expired".into(),
};
diagnostics.push(handle_info);
}
drop(stream_state);

tracing::trace!("diagnostics: {diagnostics:?}");
Ok(Json(diagnostics))
}

#[derive(Debug, Deserialize)]
struct BlockReq {
block_reads: bool,
Expand Down
8 changes: 6 additions & 2 deletions sqld/src/http/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ where
P: Proxy,
S: ReplicationLog,
{
pub fn configure(self, join_set: &mut JoinSet<anyhow::Result<()>>) {
pub fn configure(
self,
join_set: &mut JoinSet<anyhow::Result<()>>,
) -> Arc<hrana::http::Server<<<M as MakeNamespace>::Database as Database>::Connection>> {
let (hrana_accept_tx, hrana_accept_rx) = mpsc::channel(8);
let (hrana_upgrade_tx, hrana_upgrade_rx) = mpsc::channel(8);
let hrana_http_srv = Arc::new(hrana::http::Server::new(self.self_url.clone()));
Expand Down Expand Up @@ -283,7 +286,7 @@ where
let state = AppState {
auth: self.auth,
upgrade_tx: hrana_upgrade_tx,
hrana_http_srv,
hrana_http_srv: hrana_http_srv.clone(),
enable_console: self.enable_console,
namespaces: self.namespaces,
disable_default_namespace: self.disable_default_namespace,
Expand Down Expand Up @@ -418,6 +421,7 @@ where
Ok(())
});
}
hrana_http_srv
}
}

Expand Down
8 changes: 6 additions & 2 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,14 @@ where
path: self.path.clone(),
};

user_http.configure(join_set);
let user_http_service = user_http.configure(join_set);

if let Some(AdminApiConfig { acceptor }) = self.admin_api_config {
join_set.spawn(http::admin::run(acceptor, self.namespaces));
join_set.spawn(http::admin::run(
acceptor,
user_http_service,
self.namespaces,
));
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions sqld/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ impl Version {
}
}

#[derive(Debug)]
pub enum ReplicationLoggerHook {}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ReplicationLoggerHookCtx {
buffer: Vec<WalPage>,
logger: Arc<ReplicationLogger>,
Expand Down Expand Up @@ -276,7 +277,7 @@ unsafe impl WalHook for ReplicationLoggerHook {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct WalPage {
pub page_no: u32,
/// 0 for non-commit frames
Expand Down Expand Up @@ -731,6 +732,7 @@ impl LogFileHeader {
}
}

#[derive(Debug)]
pub struct Generation {
pub id: Uuid,
pub start_index: u64,
Expand All @@ -745,6 +747,7 @@ impl Generation {
}
}

#[derive(Debug)]
pub struct ReplicationLogger {
pub generation: Generation,
pub log_file: RwLock<LogFile>,
Expand Down
9 changes: 9 additions & 0 deletions sqld/src/replication/replica/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ init_static_wal_method!(INJECTOR_METHODS, InjectorHook);
/// The Caller must first call `set_frames`, passing the frames to be injected, then trigger a call
/// to xFrames from the libsql connection (see dummy write in `injector`), and can then collect the
/// result on the injection with `take_result`
#[derive(Debug)]
pub enum InjectorHook {}

pub struct InjectorHookCtx {
Expand All @@ -83,6 +84,14 @@ pub struct InjectorHookCtx {
post_commit: Box<dyn Fn(FrameNo) -> anyhow::Result<()>>,
}

impl std::fmt::Debug for InjectorHookCtx {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InjectorHookCtx")
.field("is_txn", &self.is_txn)
.finish()
}
}

impl InjectorHookCtx {
pub fn new(
receiver: tokio::sync::mpsc::Receiver<Frames>,
Expand Down
Loading