Skip to content

Commit

Permalink
update to metrics 0.23.0 or could work with metrics-exporter-promethe… (
Browse files Browse the repository at this point in the history
huggingface#2190)

update to metrics 0.23.0 or could work with metrics-exporter-prometheus 0.15.1

Signed-off-by: Wang, Yi A <yi.a.wang@intel.com>
  • Loading branch information
sywangyi authored and yuanwu2017 committed Sep 25, 2024
1 parent 4a54e41 commit 74edda9
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 129 deletions.
28 changes: 3 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ futures = "0.3.28"
hf-hub = { workspace = true }
itertools = "0.10"
jsonschema = { version = "0.17.1", features = ["draft202012"] }
metrics = "0.21.1"
metrics = "0.23.0"
metrics-exporter-prometheus = { version = "0.15.1", features = [] }
nohash-hasher = "0.2.0"
opentelemetry = { version = "0.20.0", features = ["rt-tokio"] }
Expand Down
8 changes: 4 additions & 4 deletions router/src/infer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ impl Infer {
.limit_concurrent_requests
.try_acquire_owned()
.map_err(|err| {
metrics::increment_counter!("tgi_request_failure", "err" => "overloaded");
metrics::counter!("tgi_request_failure", "err" => "overloaded").increment(1);
tracing::error!("{err}");
err
})?;

// Validate request
let valid_request = self.validation.validate(request).await.map_err(|err| {
metrics::increment_counter!("tgi_request_failure", "err" => "validation");
metrics::counter!("tgi_request_failure", "err" => "validation").increment(1);
tracing::error!("{err}");
err
})?;
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Infer {
.ok_or_else(|| InferError::TemplateError(ErrorKind::TemplateNotFound.into()))?
.apply(messages, grammar_with_prompt)
.map_err(|e| {
metrics::increment_counter!("tgi_request_failure", "err" => "template");
metrics::counter!("tgi_request_failure", "err" => "template").increment(1);
tracing::error!("{e}");
e
})
Expand Down Expand Up @@ -214,7 +214,7 @@ impl Infer {
})
} else {
let err = InferError::IncompleteGeneration;
metrics::increment_counter!("tgi_request_failure", "err" => "incomplete");
metrics::counter!("tgi_request_failure", "err" => "incomplete").increment(1);
tracing::error!("{err}");
Err(err)
}
Expand Down
8 changes: 4 additions & 4 deletions router/src/infer/v2/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn queue_task(
match cmd {
QueueCommand::Append(entry, span) => {
span.in_scope(|| state.append(*entry));
metrics::increment_gauge!("tgi_queue_size", 1.0);
metrics::gauge!("tgi_queue_size").increment(1.0);
}
QueueCommand::NextBatch {
min_size,
Expand All @@ -124,7 +124,7 @@ async fn queue_task(
let next_batch =
state.next_batch(min_size, max_size, prefill_token_budget, token_budget);
response_sender.send(next_batch).unwrap();
metrics::gauge!("tgi_queue_size", state.entries.len() as f64);
metrics::gauge!("tgi_queue_size").set(state.entries.len() as f64);
}),
}
}
Expand Down Expand Up @@ -226,7 +226,7 @@ impl State {
// Filter entries where the response receiver was dropped (== entries where the request
// was dropped by the client)
if entry.response_tx.is_closed() {
metrics::increment_counter!("tgi_request_failure", "err" => "dropped");
metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1);
tracing::debug!("Dropping entry");
continue;
}
Expand Down Expand Up @@ -336,7 +336,7 @@ impl State {
// Increment batch id
self.next_batch_id += 1;

metrics::histogram!("tgi_batch_next_size", batch.size as f64);
metrics::histogram!("tgi_batch_next_size").record(batch.size as f64);

Some((batch_entries, batch, next_batch_span))
}
Expand Down
61 changes: 36 additions & 25 deletions router/src/infer/v2/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ pub(crate) async fn batching_task(
let batch_size = batch.size;
let batch_max_tokens = batch.max_tokens;
let mut batches = vec![batch];
metrics::gauge!("tgi_batch_current_size", batch_size as f64);
metrics::gauge!("tgi_batch_current_max_tokens", batch_max_tokens as f64);
metrics::gauge!("tgi_batch_current_size").set(batch_size as f64);
metrics::gauge!("tgi_batch_current_max_tokens").set(batch_max_tokens as f64);

let min_size = if waiting_tokens >= max_waiting_tokens {
// If we didn't onboard any new requests since >= max_waiting_tokens, we try
Expand All @@ -170,9 +170,11 @@ pub(crate) async fn batching_task(
{
// Tracking metrics
if min_size.is_some() {
metrics::increment_counter!("tgi_batch_concat", "reason" => "backpressure");
metrics::counter!("tgi_batch_concat", "reason" => "backpressure")
.increment(1);
} else {
metrics::increment_counter!("tgi_batch_concat", "reason" => "wait_exceeded");
metrics::counter!("tgi_batch_concat", "reason" => "wait_exceeded")
.increment(1);
}

entries.iter_mut().for_each(|(_, entry)| {
Expand Down Expand Up @@ -219,8 +221,8 @@ pub(crate) async fn batching_task(
.await;
waiting_tokens += 1;
}
metrics::gauge!("tgi_batch_current_size", 0.0);
metrics::gauge!("tgi_batch_current_max_tokens", 0.0);
metrics::gauge!("tgi_batch_current_size").set(0.0);
metrics::gauge!("tgi_batch_current_max_tokens").set(0.0);
}
}
}
Expand All @@ -234,7 +236,7 @@ async fn prefill(
) -> Option<CachedBatch> {
let start_time = Instant::now();
let batch_id = batch.id;
metrics::increment_counter!("tgi_batch_inference_count", "method" => "prefill");
metrics::counter!("tgi_batch_inference_count", "method" => "prefill").increment(1);

match client.prefill(batch).await {
Ok((generations, next_batch, timings)) => {
Expand All @@ -248,11 +250,15 @@ async fn prefill(
// Filter next batch and remove requests that were stopped
let next_batch = filter_batch(client, next_batch, entries).await;

metrics::histogram!("tgi_batch_forward_duration", timings.forward.as_secs_f64(), "method" => "prefill");
metrics::histogram!("tgi_batch_decode_duration", timings.decode.as_secs_f64(), "method" => "prefill");
metrics::histogram!("tgi_batch_filter_duration", start_filtering_time.elapsed().as_secs_f64(), "method" => "prefill");
metrics::histogram!("tgi_batch_inference_duration", start_time.elapsed().as_secs_f64(), "method" => "prefill");
metrics::increment_counter!("tgi_batch_inference_success", "method" => "prefill");
metrics::histogram!("tgi_batch_forward_duration","method" => "prefill")
.record(timings.forward.as_secs_f64());
metrics::histogram!("tgi_batch_decode_duration", "method" => "prefill")
.record(timings.decode.as_secs_f64());
metrics::histogram!("tgi_batch_filter_duration", "method" => "prefill")
.record(start_filtering_time.elapsed().as_secs_f64());
metrics::histogram!("tgi_batch_inference_duration","method" => "prefill")
.record(start_time.elapsed().as_secs_f64());
metrics::counter!("tgi_batch_inference_success", "method" => "prefill").increment(1);
next_batch
}
// If we have an error, we discard the whole batch
Expand All @@ -261,7 +267,7 @@ async fn prefill(
generation_health.store(false, Ordering::SeqCst);
let _ = client.clear_cache(Some(batch_id)).await;
send_errors(err, entries);
metrics::increment_counter!("tgi_batch_inference_failure", "method" => "prefill");
metrics::counter!("tgi_batch_inference_failure", "method" => "prefill").increment(1);
None
}
}
Expand All @@ -276,7 +282,7 @@ async fn decode(
) -> Option<CachedBatch> {
let start_time = Instant::now();
let batch_ids: Vec<u64> = batches.iter().map(|b| b.id).collect();
metrics::increment_counter!("tgi_batch_inference_count", "method" => "decode");
metrics::counter!("tgi_batch_inference_count", "method" => "decode").increment(1);

match client.decode(batches).await {
Ok((generations, next_batch, timings)) => {
Expand All @@ -291,13 +297,18 @@ async fn decode(
let next_batch = filter_batch(client, next_batch, entries).await;

if let Some(concat_duration) = timings.concat {
metrics::histogram!("tgi_batch_concat_duration", concat_duration.as_secs_f64(), "method" => "decode");
metrics::histogram!("tgi_batch_concat_duration", "method" => "decode")
.record(concat_duration.as_secs_f64());
}
metrics::histogram!("tgi_batch_forward_duration", timings.forward.as_secs_f64(), "method" => "decode");
metrics::histogram!("tgi_batch_decode_duration", timings.decode.as_secs_f64(), "method" => "decode");
metrics::histogram!("tgi_batch_filter_duration", start_filtering_time.elapsed().as_secs_f64(), "method" => "decode");
metrics::histogram!("tgi_batch_inference_duration", start_time.elapsed().as_secs_f64(), "method" => "decode");
metrics::increment_counter!("tgi_batch_inference_success", "method" => "decode");
metrics::histogram!("tgi_batch_forward_duration", "method" => "decode")
.record(timings.forward.as_secs_f64());
metrics::histogram!("tgi_batch_decode_duration", "method" => "decode")
.record(timings.decode.as_secs_f64());
metrics::histogram!("tgi_batch_filter_duration", "method" => "decode")
.record(start_filtering_time.elapsed().as_secs_f64());
metrics::histogram!("tgi_batch_inference_duration", "method" => "decode")
.record(start_time.elapsed().as_secs_f64());
metrics::counter!("tgi_batch_inference_success", "method" => "decode").increment(1);
next_batch
}
// If we have an error, we discard the whole batch
Expand All @@ -307,7 +318,7 @@ async fn decode(
let _ = client.clear_cache(Some(id)).await;
}
send_errors(err, entries);
metrics::increment_counter!("tgi_batch_inference_failure", "method" => "decode");
metrics::counter!("tgi_batch_inference_failure", "method" => "decode").increment(1);
None
}
}
Expand Down Expand Up @@ -365,7 +376,7 @@ fn filter_send_generations(generations: Vec<Generation>, entries: &mut IntMap<u6
// request and we need to stop generating hence why we unwrap_or(true)
let stopped = send_responses(generation, entry).map_err(|err| {
tracing::error!("Entry response channel error.");
metrics::increment_counter!("tgi_request_failure", "err" => "dropped");
metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1);
err
}).unwrap_or(true);
if stopped {
Expand All @@ -381,7 +392,7 @@ fn send_responses(
) -> Result<bool, Box<SendError<Result<InferStreamResponse, InferError>>>> {
// Return directly if the channel is disconnected
if entry.response_tx.is_closed() {
metrics::increment_counter!("tgi_request_failure", "err" => "dropped");
metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1);
return Ok(true);
}

Expand All @@ -407,7 +418,7 @@ fn send_responses(
// Create last Token
let tokens_ = generation.tokens.expect("Non empty tokens in generation");
let n = tokens_.ids.len();
metrics::histogram!("tgi_request_skipped_tokens", (n - 1) as f64);
metrics::histogram!("tgi_request_skipped_tokens").record((n - 1) as f64);
let mut iterator = tokens_
.ids
.into_iter()
Expand Down Expand Up @@ -472,7 +483,7 @@ fn send_errors(error: ClientError, entries: &mut IntMap<u64, Entry>) {
// Create and enter a span to link this function back to the entry
let _send_error_span = info_span!(parent: entry.temp_span.as_ref().expect("batch_span is None. This is a bug."), "send_error").entered();
let err = InferError::GenerationError(error.to_string());
metrics::increment_counter!("tgi_request_failure", "err" => "generation");
metrics::counter!("tgi_request_failure", "err" => "generation").increment(1);
tracing::error!("{err}");

// unwrap_or is valid here as we don't care if the receiver is gone.
Expand Down
8 changes: 4 additions & 4 deletions router/src/infer/v3/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async fn queue_task(
match cmd {
QueueCommand::Append(entry, span) => {
span.in_scope(|| state.append(*entry));
metrics::increment_gauge!("tgi_queue_size", 1.0);
metrics::gauge!("tgi_queue_size").increment(1.0);
}
QueueCommand::NextBatch {
min_size,
Expand All @@ -141,7 +141,7 @@ async fn queue_task(
.instrument(span)
.await;
response_sender.send(next_batch).unwrap();
metrics::gauge!("tgi_queue_size", state.entries.len() as f64);
metrics::gauge!("tgi_queue_size").set(state.entries.len() as f64);
}
}
}
Expand Down Expand Up @@ -248,7 +248,7 @@ impl State {
// Filter entries where the response receiver was dropped (== entries where the request
// was dropped by the client)
if entry.response_tx.is_closed() {
metrics::increment_counter!("tgi_request_failure", "err" => "dropped");
metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1);
tracing::debug!("Dropping entry");
continue;
}
Expand Down Expand Up @@ -399,7 +399,7 @@ impl State {
// Increment batch id
self.next_batch_id += 1;

metrics::histogram!("tgi_batch_next_size", batch.size as f64);
metrics::histogram!("tgi_batch_next_size").record(batch.size as f64);

Some((batch_entries, batch, next_batch_span))
}
Expand Down
Loading

0 comments on commit 74edda9

Please sign in to comment.