Skip to content

Commit

Permalink
chore: add metrics oss (#478)
Browse files Browse the repository at this point in the history
* feat: add StoreWithMetrics

* chore: add metrics for object store

Co-authored-by: xikai.wxk <xikai.wxk@antgroup.com>
  • Loading branch information
chunshao90 and ShiKaiWi authored Dec 12, 2022
1 parent 04c160a commit 3c75bf7
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 15 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ parquet = { version = "23.0.0" }
paste = "1.0"
profile = { path = "components/profile" }
prometheus = "0.12"
prometheus-static-metric = "0.5"
proto = { path = "proto" }
prost = "0.11"
query_engine = { path = "query_engine" }
Expand Down
9 changes: 6 additions & 3 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::Future;
use message_queue::kafka::kafka_impl::KafkaImpl;
use object_store::{
aliyun::AliyunOSS, disk_cache::DiskCacheStore, mem_cache::MemCacheStore,
prefix::StoreWithPrefix, LocalFileSystem, ObjectStoreRef,
metrics::StoreWithMetrics, prefix::StoreWithPrefix, LocalFileSystem, ObjectStoreRef,
};
use snafu::{Backtrace, ResultExt, Snafu};
use table_engine::engine::{EngineRuntimes, TableEngineRef};
Expand Down Expand Up @@ -394,8 +394,11 @@ fn open_storage(
aliyun_opts.endpoint,
aliyun_opts.bucket,
));
Arc::new(StoreWithPrefix::new(aliyun_opts.prefix, oss).context(OpenObjectStore)?)
as _
let oss_with_metrics = Arc::new(StoreWithMetrics::new(oss));
Arc::new(
StoreWithPrefix::new(aliyun_opts.prefix, oss_with_metrics)
.context(OpenObjectStore)?,
) as _
}
};

Expand Down
31 changes: 20 additions & 11 deletions components/object_store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
[package]
name = "object_store"
version.workspace = true
authors.workspace = true
edition.workspace = true

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true

[dependencies]
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
common_util = { workspace = true }
crc = "3.0.0"
futures = { workspace = true }
upstream = { package = "object_store", version = "0.5.1" }
oss-rust-sdk = { git = "https://github.com/jiacai2050/oss-rust-sdk.git" , rev="a6c54471d0b187667395d6cac931bd4e01a9f4e4" }
lazy_static = { workspace = true }
log = { workspace = true }
lru = { workspace = true }
lru-weighted-cache = { git = "https://github.com/jiacai2050/lru-weighted-cache.git", rev = "1cf61aaf88469387e610dc7154fa318843491428" }
oss-rust-sdk = { git = "https://github.com/jiacai2050/oss-rust-sdk.git", rev = "a6c54471d0b187667395d6cac931bd4e01a9f4e4" }
prost = { workspace = true }
proto = { workspace = true }
prometheus = { workspace = true }
prometheus-static-metric = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
lru = { workspace = true }
chrono = { workspace = true }
tokio = { workspace = true }
lru-weighted-cache = { git = "https://github.com/jiacai2050/lru-weighted-cache.git" , rev="1cf61aaf88469387e610dc7154fa318843491428"}
log = { workspace = true }
proto = { workspace = true }
prost = { workspace = true }
upstream = { package = "object_store", version = "0.5.1" }

[dev-dependencies]
tempfile = { workspace = true }
1 change: 1 addition & 0 deletions components/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub use upstream::{
pub mod aliyun;
pub mod disk_cache;
pub mod mem_cache;
pub mod metrics;
pub mod prefix;

pub type ObjectStoreRef = Arc<dyn ObjectStore>;
182 changes: 182 additions & 0 deletions components/object_store/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{fmt::Display, ops::Range};

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use prometheus::{exponential_buckets, register_histogram_vec, HistogramVec};
use prometheus_static_metric::make_static_metric;
use tokio::io::AsyncWrite;
use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};

use crate::ObjectStoreRef;

make_static_metric! {
pub struct ObjectStoreDurationHistogram: Histogram {
"op" => {
put,
put_multipart,
abort_multipart,
get,
get_range,
get_ranges,
head,
delete,
list,
list_with_delimiter,
copy,
rename,
copy_if_not_exists,
rename_if_not_exists
},
}

pub struct ObjectStoreThroughputHistogram: Histogram {
"op" => {
put,
get_range,
get_ranges,
},
}
}

lazy_static! {
static ref OBJECT_STORE_DURATION_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!(
"object_store_latency",
"latency of object store's operation",
&["op"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
static ref OBJECT_STORE_THROUGHPUT_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!(
"object_store_throughput",
"throughput of object store's operation",
&["op"],
// The max bound value is 64 * 2^24 = 1GB
exponential_buckets(64.0, 4.0, 12).unwrap()
)
.unwrap();
}

lazy_static! {
pub static ref OBJECT_STORE_DURATION_HISTOGRAM: ObjectStoreDurationHistogram =
ObjectStoreDurationHistogram::from(&OBJECT_STORE_DURATION_HISTOGRAM_VEC);
pub static ref OBJECT_STORE_THROUGHPUT_HISTOGRAM: ObjectStoreThroughputHistogram =
ObjectStoreThroughputHistogram::from(&OBJECT_STORE_THROUGHPUT_HISTOGRAM_VEC);
}

/// A object store wrapper for collecting statistics about the underlying store.
#[derive(Debug)]
pub struct StoreWithMetrics {
store: ObjectStoreRef,
}

impl StoreWithMetrics {
pub fn new(store: ObjectStoreRef) -> Self {
Self { store }
}
}

impl Display for StoreWithMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Store with metrics, underlying store:{}", self.store)
}
}

#[async_trait]
impl ObjectStore for StoreWithMetrics {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.put.start_timer();
OBJECT_STORE_THROUGHPUT_HISTOGRAM
.put
.observe(bytes.len() as f64);
self.store.put(location, bytes).await
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.put_multipart.start_timer();
self.store.put_multipart(location).await
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM
.abort_multipart
.start_timer();
self.store.abort_multipart(location, multipart_id).await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.get.start_timer();
self.store.get(location).await
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.get_range.start_timer();
let result = self.store.get_range(location, range).await?;
OBJECT_STORE_THROUGHPUT_HISTOGRAM
.get_range
.observe(result.len() as f64);
Ok(result)
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.get_ranges.start_timer();
let result = self.store.get_ranges(location, ranges).await?;
let len: usize = result.iter().map(|v| v.len()).sum();
OBJECT_STORE_THROUGHPUT_HISTOGRAM
.get_range
.observe(len as f64);
Ok(result)
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.head.start_timer();
self.store.head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.delete.start_timer();
self.store.delete(location).await
}

async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.list.start_timer();
self.store.list(prefix).await
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM
.list_with_delimiter
.start_timer();
self.store.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.copy.start_timer();
self.store.copy(from, to).await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.rename.start_timer();
self.store.rename(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM
.copy_if_not_exists
.start_timer();
self.store.copy_if_not_exists(from, to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM
.rename_if_not_exists
.start_timer();
self.store.rename_if_not_exists(from, to).await
}
}
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ profile = { workspace = true }
prost = { workspace = true }
query_engine = { workspace = true }
prometheus = { workspace = true }
prometheus-static-metric = "0.5"
prometheus-static-metric = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
Expand Down

0 comments on commit 3c75bf7

Please sign in to comment.