Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sqld/src/http/admin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use axum::extract::{Path, State};
use axum::Json;

use crate::namespace::MakeNamespace;
use crate::replication::FrameNo;
use crate::stats::Stats;

use super::AppState;
Expand All @@ -16,6 +17,7 @@ pub struct StatsResponse {
pub rows_written_count: u64,
pub storage_bytes_used: u64,
pub write_requests_delegated: u64,
pub current_frame_no: FrameNo,
}

impl From<&Stats> for StatsResponse {
Expand All @@ -25,6 +27,7 @@ impl From<&Stats> for StatsResponse {
rows_written_count: stats.rows_written(),
storage_bytes_used: stats.storage_bytes_used(),
write_requests_delegated: stats.write_requests_delegated(),
current_frame_no: stats.get_current_frame_no(),
}
}
}
Expand Down
18 changes: 17 additions & 1 deletion sqld/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use hyper::Uri;
use rusqlite::ErrorCode;
use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS;
use tokio::io::AsyncBufReadExt;
use tokio::sync::watch;
use tokio::task::{block_in_place, JoinSet};
use tokio_util::io::StreamReader;
use tonic::transport::Channel;
Expand All @@ -29,7 +30,7 @@ use crate::database::{Database, PrimaryDatabase, ReplicaDatabase};
use crate::error::{Error, LoadDumpError};
use crate::replication::primary::logger::{ReplicationLoggerHookCtx, REPLICATION_METHODS};
use crate::replication::replica::Replicator;
use crate::replication::{NamespacedSnapshotCallback, ReplicationLogger};
use crate::replication::{FrameNo, NamespacedSnapshotCallback, ReplicationLogger};
use crate::stats::Stats;
use crate::{
run_periodic_checkpoint, StatsSender, DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT,
Expand Down Expand Up @@ -476,6 +477,7 @@ impl Namespace<ReplicaDatabase> {
&mut join_set,
config.stats_sender.clone(),
name.clone(),
replicator.current_frame_no_notifier.clone(),
)
.await?;

Expand Down Expand Up @@ -619,6 +621,7 @@ impl Namespace<PrimaryDatabase> {
&mut join_set,
config.stats_sender.clone(),
name.clone(),
logger.new_frame_notifier.subscribe(),
)
.await?;

Expand Down Expand Up @@ -679,6 +682,7 @@ async fn make_stats(
join_set: &mut JoinSet<anyhow::Result<()>>,
stats_sender: StatsSender,
name: Bytes,
mut current_frame_no: watch::Receiver<FrameNo>,
) -> anyhow::Result<Arc<Stats>> {
let stats = Stats::new(db_path, join_set).await?;

Expand All @@ -687,6 +691,18 @@ async fn make_stats(
.send((name.clone(), Arc::downgrade(&stats)))
.await;

join_set.spawn({
let stats = stats.clone();
// initialize the current_frame_no value
stats.set_current_frame_no(*current_frame_no.borrow_and_update());
async move {
while current_frame_no.changed().await.is_ok() {
stats.set_current_frame_no(*current_frame_no.borrow_and_update());
}
Ok(())
}
});

join_set.spawn(run_storage_monitor(db_path.into(), Arc::downgrade(&stats)));

Ok(stats)
Expand Down
2 changes: 1 addition & 1 deletion sqld/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ pub struct LogFileHeader {

impl LogFileHeader {
pub fn last_frame_no(&self) -> FrameNo {
self.start_frame_no + self.frame_count
self.start_frame_no + self.frame_count - 1
}

fn sqld_version(&self) -> Version {
Expand Down
16 changes: 16 additions & 0 deletions sqld/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@ use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;

use crate::replication::FrameNo;

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Stats {
#[serde(default)]
rows_written: AtomicU64,
#[serde(default)]
rows_read: AtomicU64,
#[serde(default)]
storage_bytes_used: AtomicU64,
// number of write requests delegated from a replica to primary
#[serde(default)]
write_requests_delegated: AtomicU64,
#[serde(default)]
current_frame_no: AtomicU64,
}

impl Stats {
Expand Down Expand Up @@ -75,6 +83,14 @@ impl Stats {
pub fn write_requests_delegated(&self) -> u64 {
self.write_requests_delegated.load(Ordering::Relaxed)
}

pub fn set_current_frame_no(&self, fno: FrameNo) {
self.current_frame_no.store(fno, Ordering::Relaxed);
}

pub(crate) fn get_current_frame_no(&self) -> FrameNo {
self.current_frame_no.load(Ordering::Relaxed)
}
}

async fn spawn_stats_persist_thread(stats: Weak<Stats>, path: PathBuf) -> anyhow::Result<()> {
Expand Down