Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor stats method in wal #1098

Merged
merged 4 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use warp::{
http::StatusCode,
reject,
reply::{self, Reply},
Filter,
Filter, Rejection,
};

use crate::{
Expand Down Expand Up @@ -204,7 +204,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.or(self.profile_heap())
.or(self.server_config())
.or(self.shards())
.or(self.stats())
.or(self.wal_stats())
.with(warp::log("http_requests"))
.with(warp::log::custom(|info| {
let path = info.path();
Expand Down Expand Up @@ -516,24 +516,32 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
}

// GET /debug/stats
fn stats(&self) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
let opened_wals = self.opened_wals.clone();
warp::path!("debug" / "stats")
fn wal_stats(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("debug" / "wal_stats")
.and(warp::get())
.map(move || {
[
"Data wal stats:",
&opened_wals
.data_wal
.get_statistics()
.unwrap_or_else(|| "Unknown".to_string()),
"Manifest wal stats:",
&opened_wals
.manifest_wal
.get_statistics()
.unwrap_or_else(|| "Unknown".to_string()),
]
.join("\n")
.and(self.with_opened_wals())
.and_then(|wals: OpenedWals| async move {
let wal_stats = wals
.data_wal
.get_statistics()
.await
.unwrap_or_else(|| "Unknown".to_string());

let manifest_wal_stats = wals
.manifest_wal
.get_statistics()
.await
.unwrap_or_else(|| "Unknown".to_string());

let stats = format!(
"[Data wal stats]:\n{wal_stats}
\n\n------------------------------------------------------\n\n
[Manifest wal stats]:\n{manifest_wal_stats}"
);

std::result::Result::<_, Rejection>::Ok(stats.into_response())
})
}

Expand Down Expand Up @@ -653,6 +661,11 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
let log_runtime = self.log_runtime.clone();
warp::any().map(move || log_runtime.clone())
}

fn with_opened_wals(&self) -> impl Filter<Extract = (OpenedWals,), Error = Infallible> + Clone {
let wals = self.opened_wals.clone();
warp::any().map(move || wals.clone())
}
}

/// Service builder
Expand Down
4 changes: 1 addition & 3 deletions wal/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,7 @@ pub trait WalManager: Send + Sync + fmt::Debug + 'static {
async fn scan(&self, ctx: &ScanContext, req: &ScanRequest) -> Result<BatchLogIteratorAdapter>;

/// Get statistics
fn get_statistics(&self) -> Option<String> {
None
}
async fn get_statistics(&self) -> Option<String>;
}

#[derive(Debug)]
Expand Down
12 changes: 12 additions & 0 deletions wal/src/message_queue_impl/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@ impl<M: MessageQueue> Namespace<M> {
) -> Result<()> {
self.inner.mark_delete_to(location, sequence_num).await
}

pub async fn get_statistics(&self) -> String {
let regions = self.inner.regions.read().await;
let mut region_stats = Vec::with_capacity(regions.len());
for (region_id, region) in regions.iter() {
let snapshot = region.make_meta_snapshot().await;
let region_stat = format!("region_id:{region_id}, snapshot:{snapshot:?}",);
region_stats.push(region_stat);
}

region_stats.join("\n")
}
}

// TODO: more information should be included.
Expand Down
3 changes: 1 addition & 2 deletions wal/src/message_queue_impl/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,8 +682,7 @@ impl<M: MessageQueue> Region<M> {
}

/// Return snapshot, just used for test.
#[allow(dead_code)]
async fn make_meta_snapshot(&self) -> RegionMetaSnapshot {
pub async fn make_meta_snapshot(&self) -> RegionMetaSnapshot {
let inner = self.inner.write().await;
inner.make_meta_snapshot().await
}
Expand Down
7 changes: 7 additions & 0 deletions wal/src/message_queue_impl/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ impl<M: MessageQueue> WalManager for MessageQueueImpl<M> {
async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) -> Result<SequenceNumber> {
self.0.write(ctx, batch).await.box_err().context(Write)
}

async fn get_statistics(&self) -> Option<String> {
let wal_stats = self.0.get_statistics().await;
let stats = format!("#MessageQueueWal stats:\n{wal_stats}\n");

Some(stats)
}
}

#[async_trait]
Expand Down
27 changes: 25 additions & 2 deletions wal/src/rocks_impl/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ struct TableUnit {
delete_lock: Mutex<()>,
}

impl std::fmt::Debug for TableUnit {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("TableUnit")
.field("id", &self.id)
.field("next_sequence_num", &self.next_sequence_num)
.finish()
}
}

impl TableUnit {
/// Allocate a continuous range of [SequenceNumber] and returns
/// the start [SequenceNumber] of the range [start, start+`number`).
Expand Down Expand Up @@ -895,12 +904,26 @@ impl WalManager for RocksImpl {
))
}

fn get_statistics(&self) -> Option<String> {
if let Some(stats) = &self.stats {
async fn get_statistics(&self) -> Option<String> {
// RocksDB stats.
let rocksdb_stats = if let Some(stats) = &self.stats {
stats.to_string()
} else {
None
};
let rocksdb_stats = rocksdb_stats.unwrap_or_default();

// Wal stats.
let table_units = self.table_units.read().unwrap();
let mut wal_stats = Vec::with_capacity(table_units.len());
for table_unit in table_units.values() {
wal_stats.push(format!("{:?}", table_unit.as_ref()));
}
let wal_stats = wal_stats.join("\n");

let stats = format!("#RocksDB stats:\n{rocksdb_stats}\n#RocksDBWal stats:\n{wal_stats}\n");

Some(stats)
}
}

Expand Down
17 changes: 17 additions & 0 deletions wal/src/table_kv_impl/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,19 @@ impl<T> NamespaceInner<T> {
let mut table_units = self.table_units.write().unwrap();
table_units.clear();
}

fn get_statistics(&self) -> String {
let table_units = self.table_units.read().unwrap();
let mut wal_stats = Vec::with_capacity(table_units.len());
for table_unit in table_units.values() {
wal_stats.push(format!("{:?}", table_unit.as_ref()));
}
let stats = wal_stats.join("\n");

let stats = format!("#TableKvWal stats:\n{stats}\n");

stats
}
}

// Blocking operations.
Expand Down Expand Up @@ -1104,6 +1117,10 @@ impl<T: TableKv> Namespace<T> {
config,
)
}

pub fn get_statistics(&self) -> String {
self.inner.get_statistics()
}
}

// Async operations.
Expand Down
11 changes: 11 additions & 0 deletions wal/src/table_kv_impl/table_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ pub struct TableUnit {
writer: Mutex<TableUnitWriter>,
}

impl std::fmt::Debug for TableUnit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TableUnit")
.field("region_id", &self.state.region_id)
.field("table_id", &self.state.table_id)
.field("start_sequence", &self.state.start_sequence)
.field("last_sequence", &self.state.last_sequence)
.finish()
}
}

// Async or non-blocking operations.
impl TableUnit {
/// Open table unit of given `region_id` and `table_id`, the caller should
Expand Down
6 changes: 6 additions & 0 deletions wal/src/table_kv_impl/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,10 @@ impl<T: TableKv> WalManager for WalNamespaceImpl<T> {
ctx.batch_size,
))
}

async fn get_statistics(&self) -> Option<String> {
let stats = self.namespace.get_statistics();

Some(stats)
}
}