Skip to content

Commit

Permalink
chore: add metrics to inspect write path (#1264)
Browse files Browse the repository at this point in the history
## Rationale
Write path metrics about pending queue is not enough, improve it in this
pr.

## Detailed Changes
+ Add time cost for pending queue wirter/waiter.
+ Add counter for related future cacellations.

## Test Plan
Test manually.
  • Loading branch information
Rachelint authored Nov 9, 2023
1 parent 642934a commit d1f3349
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions analytic_engine/src/table/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ pub struct Metrics {
table_write_instance_flush_wait_duration: Histogram,
table_write_flush_wait_duration: Histogram,
table_write_execute_duration: Histogram,
table_write_queue_waiter_duration: Histogram,
table_write_queue_writer_duration: Histogram,
table_write_total_duration: Histogram,
}

Expand Down Expand Up @@ -219,6 +221,10 @@ impl Default for Metrics {
.with_label_values(&["wait_flush"]),
table_write_execute_duration: TABLE_WRITE_DURATION_HISTOGRAM
.with_label_values(&["execute"]),
table_write_queue_waiter_duration: TABLE_WRITE_DURATION_HISTOGRAM
.with_label_values(&["queue_waiter"]),
table_write_queue_writer_duration: TABLE_WRITE_DURATION_HISTOGRAM
.with_label_values(&["queue_writer"]),
table_write_total_duration: TABLE_WRITE_DURATION_HISTOGRAM
.with_label_values(&["total"]),
}
Expand Down Expand Up @@ -340,6 +346,16 @@ impl Metrics {
self.table_write_encode_duration.start_timer()
}

#[inline]
pub fn start_table_write_queue_waiter_timer(&self) -> HistogramTimer {
self.table_write_queue_waiter_duration.start_timer()
}

#[inline]
pub fn start_table_write_queue_writer_timer(&self) -> HistogramTimer {
self.table_write_queue_writer_duration.start_timer()
}

#[inline]
pub fn start_table_write_memtable_timer(&self) -> HistogramTimer {
self.table_write_memtable_duration.start_timer()
Expand Down
23 changes: 22 additions & 1 deletion analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ impl TableImpl {

match queue_res {
QueueResult::First => {
let _timer = self
.table_data
.metrics
.start_table_write_queue_writer_timer();

// This is the first request in the queue, and we should
// take responsibilities for merging and writing the
// requests in the queue.
Expand All @@ -299,6 +304,7 @@ impl TableImpl {

match CancellationSafeFuture::new(
Self::write_requests(write_requests),
"pending_queue_writer",
self.instance.write_runtime().clone(),
)
.await
Expand All @@ -310,7 +316,22 @@ impl TableImpl {
QueueResult::Waiter(rx) => {
// The request is successfully pushed into the queue, and just wait for the
// write result.
match rx.await {
let _timer = self
.table_data
.metrics
.start_table_write_queue_waiter_timer();

// We have ever observed that `rx` is closed in production but it is impossible
// in theory(especially after warping actual write by
// `CancellationSafeFuture`). So we also warp `rx` by
// `CancellationSafeFuture` for not just retrying but better observing.
match CancellationSafeFuture::new(
rx,
"pending_queue_waiter",
self.instance.write_runtime().clone(),
)
.await
{
Ok(res) => {
res.box_err().context(Write { table: self.name() })?;
Ok(num_rows)
Expand Down
2 changes: 2 additions & 0 deletions components/future_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ workspace = true

[dependencies]
futures = { workspace = true }
lazy_static = { workspace = true }
prometheus = { workspace = true }
runtime = { workspace = true }
tokio = { workspace = true, features = ["time"] }
35 changes: 30 additions & 5 deletions components/future_ext/src/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,32 @@ use std::{
};

use futures::future::BoxFuture;
use lazy_static::lazy_static;
use prometheus::{register_int_counter_vec, IntCounterVec};
use runtime::RuntimeRef;

lazy_static! {
static ref FUTURE_CANCEL_COUNTER: IntCounterVec = register_int_counter_vec!(
"future_cancel_counter",
"Counter of future cancel",
&["token"]
)
.unwrap();
}

/// Wrapper around a future that cannot be cancelled.
///
/// When the future is dropped/cancelled, we'll spawn a tokio task to _rescue_
/// it.
pub struct CancellationSafeFuture<F>
pub struct CancellationSafeFuture<F, T>
where
F: Future + Send + 'static,
F::Output: Send,
T: AsRef<str> + 'static + Send + Unpin,
{
/// Token for metrics
token: T,

/// Mark if the inner future finished. If not, we must spawn a helper task
/// on drop.
done: bool,
Expand All @@ -54,43 +69,51 @@ where
runtime: RuntimeRef,
}

impl<F> Drop for CancellationSafeFuture<F>
impl<F, T> Drop for CancellationSafeFuture<F, T>
where
F: Future + Send + 'static,
F::Output: Send,
T: AsRef<str> + 'static + Send + Unpin,
{
fn drop(&mut self) {
if !self.done {
FUTURE_CANCEL_COUNTER
.with_label_values(&[self.token.as_ref()])
.inc();

let inner = self.inner.take().unwrap();
let handle = self.runtime.spawn(inner);
drop(handle);
}
}
}

impl<F> CancellationSafeFuture<F>
impl<F, T> CancellationSafeFuture<F, T>
where
F: Future + Send,
F::Output: Send,
T: AsRef<str> + 'static + Send + Unpin,
{
/// Create new future that is protected from cancellation.
///
/// If [`CancellationSafeFuture`] is cancelled (i.e. dropped) and there is
/// still some external receiver of the state left, than we will drive
/// the payload (`f`) to completion. Otherwise `f` will be cancelled.
pub fn new(fut: F, runtime: RuntimeRef) -> Self {
pub fn new(fut: F, token: T, runtime: RuntimeRef) -> Self {
Self {
token,
done: false,
inner: Some(Box::pin(fut)),
runtime,
}
}
}

impl<F> Future for CancellationSafeFuture<F>
impl<F, T> Future for CancellationSafeFuture<F, T>
where
F: Future + Send,
F::Output: Send,
T: AsRef<str> + 'static + Send + Unpin,
{
type Output = F::Output;

Expand Down Expand Up @@ -144,6 +167,7 @@ mod tests {
async move {
done_captured.store(true, Ordering::SeqCst);
},
"test",
runtime_clone,
);

Expand All @@ -166,6 +190,7 @@ mod tests {
async move {
done_captured.wait().await;
},
"test",
runtime_clone,
);

Expand Down

0 comments on commit d1f3349

Please sign in to comment.