Skip to content

Commit

Permalink
chore: add metrics for meta data cache hit rate (#1188)
Browse files Browse the repository at this point in the history
## Rationale
There is no metrics to report the hit rate for the meta data cache.

## Detailed Changes
- Add counter for the meta data cache hit/miss.
- Formalize the logs for the sql query and remote engine service query.

## Test Plan
Existing tests.
  • Loading branch information
ShiKaiWi authored Aug 31, 2023
1 parent b25515f commit b0d7c2c
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 36 deletions.
10 changes: 9 additions & 1 deletion analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::sst::{
metadata_reader::parse_metadata, KvMetaDataNotFound, KvMetaVersionEmpty,
ParquetMetaDataRef, Result,
},
metrics::{META_DATA_CACHE_HIT_COUNTER, META_DATA_CACHE_MISS_COUNTER},
parquet::encoding,
};

Expand Down Expand Up @@ -140,7 +141,14 @@ impl MetaCache {
}

pub fn get(&self, key: &str) -> Option<MetaData> {
self.cache.write().unwrap().get(key).cloned()
let v = self.cache.write().unwrap().get(key).cloned();
if v.is_some() {
META_DATA_CACHE_HIT_COUNTER.inc()
} else {
META_DATA_CACHE_MISS_COUNTER.inc()
}

v
}

pub fn put(&self, key: String, value: MetaData) {
Expand Down
12 changes: 11 additions & 1 deletion analytic_engine/src/sst/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use lazy_static::lazy_static;
use prometheus::{exponential_buckets, register_histogram, Histogram};
use prometheus::{exponential_buckets, register_counter, register_histogram, Counter, Histogram};

lazy_static! {
// Histogram:
Expand All @@ -23,4 +23,14 @@ lazy_static! {
"Histogram for sst get range length",
exponential_buckets(100.0, 2.0, 5).unwrap()
).unwrap();

pub static ref META_DATA_CACHE_HIT_COUNTER: Counter = register_counter!(
"META_DATA_CACHE_HIT",
"The counter for meta data cache hit"
).unwrap();

pub static ref META_DATA_CACHE_MISS_COUNTER: Counter = register_counter!(
"META_DATA_CACHE_MISS",
"The counter for meta data cache miss"
).unwrap();
}
45 changes: 24 additions & 21 deletions proxy/src/hotspot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

//! hotspot recorder
use std::{fmt::Write, sync::Arc, time::Duration};
use std::{fmt::Write, sync::Arc};

use ceresdbproto::storage::{
PrometheusQueryRequest, RequestContext, SqlQueryRequest, WriteRequest,
Expand All @@ -22,6 +22,7 @@ use log::{info, warn};
use runtime::Runtime;
use serde::{Deserialize, Serialize};
use spin::Mutex as SpinMutex;
use time_ext::ReadableDuration;
use timed_task::TimedTask;
use tokio::sync::mpsc::{self, Sender};

Expand All @@ -30,7 +31,7 @@ use crate::{hotspot_lru::HotspotLru, util};
type QueryKey = String;
type WriteKey = String;
const TAG: &str = "hotspot autodump";
const RECODER_CHANNEL_CAP: usize = 64 * 1024;
const RECORDER_CHANNEL_CAP: usize = 64 * 1024;

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default)]
Expand All @@ -39,20 +40,22 @@ pub struct Config {
query_cap: Option<usize>,
/// Max items size for write hotspot
write_cap: Option<usize>,
dump_interval: Duration,
auto_dump_interval: bool,
/// Max items for dump hotspot
auto_dump_len: usize,
/// The hotspot records will be auto dumped if set.
enable_auto_dump: bool,
/// The interval between two auto dumps
auto_dump_interval: ReadableDuration,
/// The number of items for auto dump
auto_dump_num_items: usize,
}

impl Default for Config {
fn default() -> Self {
Self {
query_cap: Some(10_000),
write_cap: Some(10_000),
dump_interval: Duration::from_secs(5),
auto_dump_interval: true,
auto_dump_len: 10,
auto_dump_interval: ReadableDuration::minutes(1),
enable_auto_dump: true,
auto_dump_num_items: 10,
}
}
}
Expand Down Expand Up @@ -142,9 +145,9 @@ impl HotspotRecorder {
hotspot_field_write: hotspot_field_write.clone(),
};

let task_handle = if config.auto_dump_interval {
let interval = config.dump_interval;
let dump_len = config.auto_dump_len;
let task_handle = if config.enable_auto_dump {
let interval = config.auto_dump_interval;
let dump_len = config.auto_dump_num_items;
let stat_clone = stat.clone();
let builder = move || {
let stat_in_builder = stat_clone.clone();
Expand Down Expand Up @@ -173,14 +176,14 @@ impl HotspotRecorder {
Some(TimedTask::start_timed_task(
String::from("hotspot_dump"),
&runtime,
interval,
interval.0,
builder,
))
} else {
None
};

let (tx, mut rx) = mpsc::channel(RECODER_CHANNEL_CAP);
let (tx, mut rx) = mpsc::channel(RECORDER_CHANNEL_CAP);
runtime.spawn(async move {
loop {
match rx.recv().await {
Expand Down Expand Up @@ -313,7 +316,7 @@ impl HotspotRecorder {

#[cfg(test)]
mod test {
use std::thread;
use std::{thread, time::Duration};

use ceresdbproto::{
storage,
Expand Down Expand Up @@ -348,9 +351,9 @@ mod test {
let options = Config {
query_cap: read_cap,
write_cap,
auto_dump_interval: false,
dump_interval: Duration::from_millis(5000),
auto_dump_len: 10,
enable_auto_dump: false,
auto_dump_interval: ReadableDuration::millis(5000),
auto_dump_num_items: 10,
};
let recorder = HotspotRecorder::new(options, runtime.clone());
assert!(recorder.stat.pop_read_hots().unwrap().is_empty());
Expand Down Expand Up @@ -384,9 +387,9 @@ mod test {
let options = Config {
query_cap: read_cap,
write_cap,
auto_dump_interval: false,
dump_interval: Duration::from_millis(5000),
auto_dump_len: 10,
enable_auto_dump: false,
auto_dump_interval: ReadableDuration::millis(5000),
auto_dump_num_items: 10,
};

let recorder = HotspotRecorder::new(options, runtime.clone());
Expand Down
21 changes: 13 additions & 8 deletions proxy/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,25 @@ impl Proxy {
let schema = &ctx.schema;
let ctx = Context::new(ctx.timeout, None);

match self
let query_res = self
.handle_sql(
&ctx,
schema,
&req.query,
self.sub_table_access_perm.enable_http,
)
.await
.map_err(|e| {
error!("Handle sql query failed, ctx:{ctx:?}, req:{req:?}, err:{e}");
e
})? {
SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp),
SqlResponse::Local(output) => Ok(output),
.await;

match query_res {
Err(e) => {
error!(
"Handle sql query failed, schema:{schema}, ctx:{ctx:?}, sql:{}, err:{e}",
req.query,
);
Err(e)
}
Ok(SqlResponse::Forwarded(resp)) => convert_sql_response_to_output(resp),
Ok(SqlResponse::Local(output)) => Ok(output),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Proxy {
})?;

let cost = begin_instant.saturating_elapsed();
info!("Handle sql query successfully, catalog:{catalog}, schema:{schema}, cost:{cost:?}, ctx:{ctx:?}");
info!("Handle sql query successfully, catalog:{catalog}, schema:{schema}, cost:{cost:?}, ctx:{ctx:?}, sql:{sql}");

match &output {
Output::AffectedRows(_) => Ok(output),
Expand Down
18 changes: 14 additions & 4 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ async fn handle_stream_read(
let begin = Instant::now();
let table = find_table_by_identifier(&ctx, &table_ident)?;
let res = table
.partitioned_read(read_request)
.partitioned_read(read_request.clone())
.await
.box_err()
.with_context(|| ErrWithCause {
Expand All @@ -572,15 +572,25 @@ async fn handle_stream_read(
match res {
Ok(streams) => {
info!(
"Handle stream read success, request_id:{request_id}, table:{table_ident:?}, cost:{:?}",
begin.elapsed(),
);
"Handle stream read success, request_id:{request_id}, table:{table_ident:?}, cost:{:?}, read_options:{:?}, predicate:{:?}",
begin.elapsed(),
read_request.opts,
read_request.predicate,
);

REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC
.stream_query_succeeded
.inc();
Ok(streams)
}
Err(e) => {
error!(
"Handle stream read failed, request_id:{request_id}, table:{table_ident:?}, cost:{:?}, read_options:{:?}, predicate:{:?}, err:{e}",
begin.elapsed(),
read_request.opts,
read_request.predicate,
);

REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC
.stream_query_failed
.inc();
Expand Down

0 comments on commit b0d7c2c

Please sign in to comment.