Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit f7b93ef

Browse files
committed
introduce stats route
1 parent 46e4d88 commit f7b93ef

File tree

5 files changed

+128
-14
lines changed

5 files changed

+128
-14
lines changed

sqld/src/database/libsql.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ use std::str::FromStr;
33
use std::time::{Duration, Instant};
44

55
use crossbeam::channel::RecvTimeoutError;
6-
use rusqlite::OpenFlags;
6+
use rusqlite::{OpenFlags, StatementStatus};
77
use tokio::sync::oneshot;
88
use tracing::warn;
99

1010
use crate::error::Error;
1111
use crate::libsql::wal_hook::WalHook;
1212
use crate::query::{Column, Query, QueryResponse, QueryResult, ResultSet, Row};
1313
use crate::query_analysis::{State, Statement};
14+
use crate::stats::Stats;
1415
use crate::Result;
1516

1617
use super::{Cond, Database, Program, Step, TXN_TIMEOUT_SECS};
@@ -143,11 +144,13 @@ impl LibSqlDb {
143144
path: impl AsRef<Path> + Send + 'static,
144145
wal_hook: impl WalHook + Send + Clone + 'static,
145146
with_bottomless: bool,
147+
stats: Stats,
146148
) -> crate::Result<Self> {
147149
let (sender, receiver) = crossbeam::channel::unbounded::<Message>();
148150

149151
tokio::task::spawn_blocking(move || {
150-
let mut connection = Connection::new(path.as_ref(), wal_hook, with_bottomless).unwrap();
152+
let mut connection =
153+
Connection::new(path.as_ref(), wal_hook, with_bottomless, stats).unwrap();
151154
loop {
152155
let Message { pgm, resp } = match connection.state.deadline() {
153156
Some(deadline) => match receiver.recv_deadline(deadline) {
@@ -189,18 +192,21 @@ struct Connection {
189192
state: ConnectionState,
190193
conn: rusqlite::Connection,
191194
timed_out: bool,
195+
stats: Stats,
192196
}
193197

194198
impl Connection {
195199
fn new(
196200
path: &Path,
197201
wal_hook: impl WalHook + Send + Clone + 'static,
198202
with_bottomless: bool,
203+
stats: Stats,
199204
) -> anyhow::Result<Self> {
200205
Ok(Self {
201206
conn: open_db(path, wal_hook, with_bottomless)?,
202207
state: ConnectionState::initial(),
203208
timed_out: false,
209+
stats,
204210
})
205211
}
206212

@@ -245,8 +251,8 @@ impl Connection {
245251

246252
fn execute_query_inner(&self, query: &Query) -> QueryResult {
247253
let mut rows = vec![];
248-
let mut prepared = self.conn.prepare(&query.stmt.stmt)?;
249-
let columns = prepared
254+
let mut stmt = self.conn.prepare(&query.stmt.stmt)?;
255+
let columns = stmt
250256
.columns()
251257
.iter()
252258
.map(|col| Column {
@@ -262,10 +268,10 @@ impl Connection {
262268

263269
query
264270
.params
265-
.bind(&mut prepared)
271+
.bind(&mut stmt)
266272
.map_err(Error::LibSqlInvalidQueryParams)?;
267273

268-
let mut qresult = prepared.raw_query();
274+
let mut qresult = stmt.raw_query();
269275
while let Some(row) = qresult.next()? {
270276
let mut values = vec![];
271277
for (i, _) in columns.iter().enumerate() {
@@ -288,6 +294,10 @@ impl Connection {
288294
false => None,
289295
};
290296

297+
drop(qresult);
298+
299+
self.update_stats(&stmt);
300+
291301
Ok(QueryResponse::ResultSet(ResultSet {
292302
columns,
293303
rows,
@@ -302,6 +312,18 @@ impl Connection {
302312
.execute("rollback transaction;", ())
303313
.expect("failed to rollback");
304314
}
315+
316+
fn update_stats(&self, stmt: &rusqlite::Statement) {
317+
const STMT_ROWS_READ: i32 = 1024 + 1;
318+
const STMT_ROWS_WRITTEN: i32 = 1024 + 2;
319+
320+
let rows_read: StatementStatus = unsafe { std::mem::transmute(STMT_ROWS_READ) };
321+
let rows_written: StatementStatus = unsafe { std::mem::transmute(STMT_ROWS_WRITTEN) };
322+
323+
self.stats.inc_row_read(stmt.get_status(rows_read) as usize);
324+
self.stats
325+
.inc_row_written(stmt.get_status(rows_written) as usize);
326+
}
305327
}
306328

307329
fn eval_cond(cond: &Cond, results: &[Option<QueryResult>]) -> Result<bool> {

sqld/src/database/write_proxy.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::rpc::proxy::rpc::proxy_client::ProxyClient;
1616
use crate::rpc::proxy::rpc::query_result::RowResult;
1717
use crate::rpc::proxy::rpc::DisconnectMessage;
1818
use crate::rpc::replication_log::rpc::replication_log_client::ReplicationLogClient;
19+
use crate::stats::Stats;
1920
use crate::Result;
2021

2122
use super::Program;
@@ -25,6 +26,7 @@ use super::{libsql::LibSqlDb, service::DbFactory, Database};
2526
pub struct WriteProxyDbFactory {
2627
write_proxy: ProxyClient<Channel>,
2728
db_path: PathBuf,
29+
stats: Stats,
2830
/// abort handle: abort db update loop on drop
2931
_abort_handle: crossbeam::channel::Sender<()>,
3032
}
@@ -37,6 +39,7 @@ impl WriteProxyDbFactory {
3739
key_path: Option<PathBuf>,
3840
ca_cert_path: Option<PathBuf>,
3941
db_path: PathBuf,
42+
stats: Stats,
4043
) -> anyhow::Result<(Self, JoinHandle<anyhow::Result<()>>)> {
4144
let mut endpoint = Channel::from_shared(addr.to_string())?;
4245
if tls {
@@ -84,6 +87,7 @@ impl WriteProxyDbFactory {
8487
let this = Self {
8588
write_proxy,
8689
db_path,
90+
stats,
8791
_abort_handle,
8892
};
8993
Ok((this, handle))
@@ -93,7 +97,11 @@ impl WriteProxyDbFactory {
9397
#[async_trait::async_trait]
9498
impl DbFactory for WriteProxyDbFactory {
9599
async fn create(&self) -> Result<Arc<dyn Database>> {
96-
let db = WriteProxyDatabase::new(self.write_proxy.clone(), self.db_path.clone())?;
100+
let db = WriteProxyDatabase::new(
101+
self.write_proxy.clone(),
102+
self.db_path.clone(),
103+
self.stats.clone(),
104+
)?;
97105
Ok(Arc::new(db))
98106
}
99107
}
@@ -106,8 +114,8 @@ pub struct WriteProxyDatabase {
106114
}
107115

108116
impl WriteProxyDatabase {
109-
fn new(write_proxy: ProxyClient<Channel>, path: PathBuf) -> Result<Self> {
110-
let read_db = LibSqlDb::new(path, (), false)?;
117+
fn new(write_proxy: ProxyClient<Channel>, path: PathBuf, stats: Stats) -> Result<Self> {
118+
let read_db = LibSqlDb::new(path, (), false, stats)?;
111119
Ok(Self {
112120
read_db,
113121
write_proxy,

sqld/src/http/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod hrana_over_http;
2+
mod stats;
23
mod types;
34

45
use std::future::poll_fn;
@@ -29,6 +30,7 @@ use crate::hrana;
2930
use crate::http::types::HttpQuery;
3031
use crate::query::{self, Query, QueryResult, ResultSet};
3132
use crate::query_analysis::{predict_final_state, State, Statement};
33+
use crate::stats::Stats;
3234
use crate::utils::services::idle_shutdown::IdleShutdownLayer;
3335

3436
use self::types::QueryObject;
@@ -236,6 +238,7 @@ async fn handle_request(
236238
upgrade_tx: mpsc::Sender<hrana::Upgrade>,
237239
db_factory: Arc<dyn DbFactory>,
238240
enable_console: bool,
241+
stats: Stats,
239242
) -> anyhow::Result<Response<Body>> {
240243
if hyper_tungstenite::is_upgrade_request(&req) {
241244
return Ok(handle_upgrade(&upgrade_tx, req).await);
@@ -257,6 +260,7 @@ async fn handle_request(
257260
(&Method::GET, "/v1") => hrana_over_http::handle_index(req).await,
258261
(&Method::POST, "/v1/execute") => hrana_over_http::handle_execute(db_factory, req).await,
259262
(&Method::POST, "/v1/batch") => hrana_over_http::handle_batch(db_factory, req).await,
263+
(&Method::GET, "/v1/stats") => Ok(stats::handle_stats(&stats)),
260264
_ => Ok(Response::builder().status(404).body(Body::empty()).unwrap()),
261265
}
262266
}
@@ -274,6 +278,7 @@ pub async fn run_http<F>(
274278
upgrade_tx: mpsc::Sender<hrana::Upgrade>,
275279
enable_console: bool,
276280
idle_shutdown_layer: Option<IdleShutdownLayer>,
281+
stats: Stats,
277282
) -> anyhow::Result<()>
278283
where
279284
F: MakeService<(), Vec<Query>> + Send + 'static,
@@ -317,6 +322,7 @@ where
317322
upgrade_tx.clone(),
318323
db_factory.clone(),
319324
enable_console,
325+
stats.clone(),
320326
)
321327
});
322328

sqld/src/lib.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::auth::Auth;
2525
use crate::error::Error;
2626
use crate::postgres::service::PgConnectionFactory;
2727
use crate::server::Server;
28+
use crate::stats::Stats;
2829

2930
pub use sqld_libsql_bindings as libsql;
3031

@@ -39,6 +40,7 @@ mod query_analysis;
3940
mod replication;
4041
pub mod rpc;
4142
mod server;
43+
mod stats;
4244
mod utils;
4345

4446
#[derive(clap::ValueEnum, Clone, Debug, PartialEq)]
@@ -97,6 +99,7 @@ async fn run_service(
9799
config: &Config,
98100
join_set: &mut JoinSet<anyhow::Result<()>>,
99101
idle_shutdown_layer: Option<IdleShutdownLayer>,
102+
stats: Stats,
100103
) -> anyhow::Result<()> {
101104
let auth = get_auth(config)?;
102105

@@ -122,6 +125,7 @@ async fn run_service(
122125
upgrade_tx,
123126
config.enable_http_console,
124127
idle_shutdown_layer.clone(),
128+
stats,
125129
));
126130
}
127131

@@ -183,6 +187,7 @@ async fn start_replica(
183187
join_set: &mut JoinSet<anyhow::Result<()>>,
184188
addr: &str,
185189
idle_shutdown_layer: Option<IdleShutdownLayer>,
190+
stats: Stats,
186191
) -> anyhow::Result<()> {
187192
let (factory, handle) = WriteProxyDbFactory::new(
188193
addr,
@@ -191,6 +196,7 @@ async fn start_replica(
191196
config.writer_rpc_key.clone(),
192197
config.writer_rpc_ca_cert.clone(),
193198
config.db_path.clone(),
199+
stats.clone(),
194200
)
195201
.await
196202
.context("failed to start WriteProxy DB")?;
@@ -201,7 +207,7 @@ async fn start_replica(
201207
join_set.spawn(async move { handle.await.expect("WriteProxy DB task failed") });
202208

203209
let service = DbFactoryService::new(Arc::new(factory));
204-
run_service(service, config, join_set, idle_shutdown_layer).await?;
210+
run_service(service, config, join_set, idle_shutdown_layer, stats).await?;
205211

206212
Ok(())
207213
}
@@ -214,6 +220,7 @@ async fn start_primary(
214220
config: &Config,
215221
join_set: &mut JoinSet<anyhow::Result<()>>,
216222
idle_shutdown_layer: Option<IdleShutdownLayer>,
223+
stats: Stats,
217224
) -> anyhow::Result<()> {
218225
let is_fresh_db = check_fresh_db(&config.db_path);
219226
let logger = Arc::new(ReplicationLogger::open(
@@ -237,10 +244,12 @@ async fn start_primary(
237244
dump_loader.load_dump(path.into()).await?;
238245
}
239246

247+
let stats_clone = stats.clone();
240248
let db_factory = Arc::new(move || {
241249
let db_path = path_clone.clone();
242250
let hook = hook.clone();
243-
async move { LibSqlDb::new(db_path, hook, enable_bottomless) }
251+
let stats_clone = stats_clone.clone();
252+
async move { LibSqlDb::new(db_path, hook, enable_bottomless, stats_clone) }
244253
});
245254
let service = DbFactoryService::new(db_factory.clone());
246255
if let Some(ref addr) = config.rpc_server_addr {
@@ -256,7 +265,7 @@ async fn start_primary(
256265
));
257266
}
258267

259-
run_service(service, config, join_set, idle_shutdown_layer).await?;
268+
run_service(service, config, join_set, idle_shutdown_layer, stats).await?;
260269

261270
Ok(())
262271
}
@@ -309,11 +318,13 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {
309318
.idle_shutdown_timeout
310319
.map(|d| IdleShutdownLayer::new(d, shutdown_notify.clone()));
311320

321+
let stats = Stats::new(&config.db_path)?;
322+
312323
match config.writer_rpc_addr {
313324
Some(ref addr) => {
314-
start_replica(&config, &mut join_set, addr, idle_shutdown_layer).await?
325+
start_replica(&config, &mut join_set, addr, idle_shutdown_layer, stats).await?
315326
}
316-
None => start_primary(&config, &mut join_set, idle_shutdown_layer).await?,
327+
None => start_primary(&config, &mut join_set, idle_shutdown_layer, stats).await?,
317328
}
318329

319330
let reset = HARD_RESET.clone();

sqld/src/stats.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use std::fs::{File, OpenOptions};
2+
use std::io::Seek;
3+
use std::path::Path;
4+
use std::sync::atomic::{AtomicUsize, Ordering};
5+
use std::sync::Arc;
6+
use std::time::Duration;
7+
8+
use serde::{Deserialize, Serialize};
9+
10+
#[derive(Clone)]
11+
pub struct Stats {
12+
inner: Arc<StatsInner>,
13+
}
14+
15+
#[derive(Serialize, Deserialize, Default)]
16+
struct StatsInner {
17+
rows_written: AtomicUsize,
18+
rows_read: AtomicUsize,
19+
}
20+
21+
impl Stats {
22+
pub fn new(db_path: &Path) -> anyhow::Result<Self> {
23+
let stats_path = db_path.join("stats.json");
24+
let stats_file = OpenOptions::new()
25+
.read(true)
26+
.write(true)
27+
.create(true)
28+
.open(stats_path)?;
29+
30+
let stats_inner =
31+
serde_json::from_reader(&stats_file).unwrap_or_else(|_| StatsInner::default());
32+
let inner = Arc::new(stats_inner);
33+
34+
spawn_stats_persist_thread(inner.clone(), stats_file);
35+
36+
Ok(Self { inner })
37+
}
38+
39+
/// increments the number of written rows by n
40+
pub fn inc_row_written(&self, n: usize) {
41+
self.inner.rows_written.fetch_add(n, Ordering::Relaxed);
42+
}
43+
44+
/// increments the number of read rows by n
45+
pub fn inc_row_read(&self, n: usize) {
46+
self.inner.rows_read.fetch_add(n, Ordering::Relaxed);
47+
}
48+
49+
/// returns the total number of rows read since this database was created
50+
pub fn row_read(&self) -> usize {
51+
self.inner.rows_read.load(Ordering::Relaxed)
52+
}
53+
54+
/// returns the total number of rows written since this database was created
55+
pub fn rows_written(&self) -> usize {
56+
self.inner.rows_written.load(Ordering::Relaxed)
57+
}
58+
}
59+
60+
fn spawn_stats_persist_thread(stats: Arc<StatsInner>, mut file: File) {
61+
std::thread::spawn(move || loop {
62+
if file.seek(std::io::SeekFrom::Start(0)).is_ok() {
63+
let _ = serde_json::to_writer(&mut file, &stats);
64+
}
65+
std::thread::sleep(Duration::from_secs(5));
66+
});
67+
}

0 commit comments

Comments
 (0)