Skip to content

Commit 202f0b2

Browse files
authored
[ENH]: add request timing to metering (#4877)
This PR uses the `chroma-metering` library introduced in #4868 to add execution timing to requests received on the frontend. ⚠️ #4868 should be merged first. - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes N/A
1 parent fb55bfd commit 202f0b2

File tree

6 files changed

+261
-45
lines changed

6 files changed

+261
-45
lines changed

rust/frontend/src/impls/service_based_frontend.rs

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use chroma_config::{registry, Configurable};
77
use chroma_error::{ChromaError, ErrorCodes};
88
use chroma_log::{LocalCompactionManager, LocalCompactionManagerConfig, Log};
99
use chroma_metering::{
10-
CollectionForkContext, CollectionReadContext, CollectionWriteContext, Enterable,
10+
CollectionForkContext, CollectionReadContext, CollectionWriteContext, Enterable, FinishRequest,
1111
FtsQueryLength, LatestCollectionLogicalSizeBytes, LogSizeBytes, MetadataPredicateCount,
1212
MeterEvent, PulledLogSizeBytes, QueryEmbeddingCount, ReturnBytes, WriteAction,
1313
};
@@ -40,10 +40,13 @@ use chroma_types::{
4040
};
4141
use opentelemetry::global;
4242
use opentelemetry::metrics::Counter;
43-
use std::sync::atomic::{AtomicUsize, Ordering};
4443
use std::sync::Arc;
4544
use std::time::{SystemTime, UNIX_EPOCH};
4645
use std::{collections::HashSet, time::Duration};
46+
use std::{
47+
sync::atomic::{AtomicUsize, Ordering},
48+
time::Instant,
49+
};
4750

4851
use super::utils::to_records;
4952

@@ -637,9 +640,9 @@ impl ServiceBasedFrontend {
637640
.set_collection_with_segments(collection_and_segments)
638641
.await;
639642

640-
// Attach the collection's latest logical size to the metering context
643+
// Attach metadata to the metering context
641644
chroma_metering::with_current(|context| {
642-
context.latest_collection_logical_size_bytes(latest_collection_logical_size_bytes)
645+
context.latest_collection_logical_size_bytes(latest_collection_logical_size_bytes);
643646
});
644647

645648
// TODO: Submit event after the response is sent
@@ -752,8 +755,11 @@ impl ServiceBasedFrontend {
752755
.add_retries_counter
753756
.add(retries.load(Ordering::Relaxed) as u64, &[]);
754757

755-
// Attach the log size in bytes to the metering context
756-
chroma_metering::with_current(|context| context.log_size_bytes(log_size_bytes));
758+
// Attach metadata to the metering context
759+
chroma_metering::with_current(|context| {
760+
context.log_size_bytes(log_size_bytes);
761+
context.finish_request(Instant::now());
762+
});
757763

758764
// TODO: Submit event after the response is sent
759765
match res {
@@ -834,8 +840,11 @@ impl ServiceBasedFrontend {
834840
.update_retries_counter
835841
.add(retries.load(Ordering::Relaxed) as u64, &[]);
836842

837-
// Attach the log size in bytes to the metering context
838-
chroma_metering::with_current(|context| context.log_size_bytes(log_size_bytes));
843+
// Attach metadata to the metering context
844+
chroma_metering::with_current(|context| {
845+
context.log_size_bytes(log_size_bytes);
846+
context.finish_request(Instant::now());
847+
});
839848

840849
// TODO: Submit event after the response is sent
841850
match res {
@@ -918,8 +927,11 @@ impl ServiceBasedFrontend {
918927
.upsert_retries_counter
919928
.add(retries.load(Ordering::Relaxed) as u64, &[]);
920929

921-
// Attach the log size in bytes to the metering context
922-
chroma_metering::with_current(|context| context.log_size_bytes(log_size_bytes));
930+
// Attach metadata to the metering context
931+
chroma_metering::with_current(|context| {
932+
context.log_size_bytes(log_size_bytes);
933+
context.finish_request(Instant::now());
934+
});
923935

924936
// TODO: Submit event after the response is sent
925937
match res {
@@ -1041,9 +1053,10 @@ impl ServiceBasedFrontend {
10411053
None
10421054
};
10431055

1044-
// NOTE(c-gamble): See note above for additional context, but this is a non-standard pattern
1045-
// and we are only implementing metering for delete in this manner because delete currently
1046-
// produces two metering events.
1056+
if let Some(event) = read_event {
1057+
event.submit().await;
1058+
}
1059+
10471060
let collection_write_context_container =
10481061
chroma_metering::create::<CollectionWriteContext>(CollectionWriteContext::new(
10491062
tenant_id.clone(),
@@ -1071,12 +1084,10 @@ impl ServiceBasedFrontend {
10711084
}
10721085
})?;
10731086

1074-
if let Some(event) = read_event {
1075-
event.submit().await;
1076-
}
1077-
1078-
// Attach the log size bytes to the write context
1079-
chroma_metering::with_current(|context| context.log_size_bytes(log_size_bytes));
1087+
// Attach metadata to the write context
1088+
chroma_metering::with_current(|context| {
1089+
context.log_size_bytes(log_size_bytes);
1090+
});
10801091

10811092
// TODO: Submit event after the response is sent
10821093
match chroma_metering::close::<CollectionWriteContext>() {
@@ -1296,6 +1307,7 @@ impl ServiceBasedFrontend {
12961307
context.pulled_log_size_bytes(get_result.pulled_log_bytes);
12971308
context.latest_collection_logical_size_bytes(latest_collection_logical_size_bytes);
12981309
context.return_bytes(return_bytes);
1310+
context.finish_request(Instant::now());
12991311
});
13001312

13011313
// TODO: Submit event after the response is sent
@@ -1424,6 +1436,7 @@ impl ServiceBasedFrontend {
14241436
context.pulled_log_size_bytes(query_result.pulled_log_bytes);
14251437
context.latest_collection_logical_size_bytes(latest_collection_logical_size_bytes);
14261438
context.return_bytes(return_bytes);
1439+
context.finish_request(Instant::now());
14271440
});
14281441

14291442
// TODO: Submit event after the response is sent

rust/frontend/src/server.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use axum::{
66
Json, Router, ServiceExt,
77
};
88
use chroma_metering::{
9-
CollectionForkContext, CollectionReadContext, CollectionWriteContext, MeteredFutureExt,
10-
ReadAction, WriteAction,
9+
CollectionForkContext, CollectionReadContext, CollectionWriteContext, Enterable,
10+
MeteredFutureExt, ReadAction, StartRequest, WriteAction,
1111
};
1212
use chroma_system::System;
1313
use chroma_types::{
@@ -29,11 +29,11 @@ use opentelemetry::global;
2929
use opentelemetry::metrics::{Counter, Meter};
3030
use opentelemetry::KeyValue;
3131
use serde::{Deserialize, Serialize};
32-
use std::str::FromStr;
3332
use std::sync::{
3433
atomic::{AtomicBool, Ordering},
3534
Arc,
3635
};
36+
use std::{str::FromStr, time::Instant};
3737
use tokio::{select, signal};
3838
use tower_http::cors::CorsLayer;
3939
use utoipa::openapi::security::{ApiKey, ApiKeyValue, SecurityScheme};
@@ -1340,6 +1340,12 @@ async fn collection_add(
13401340
WriteAction::Add,
13411341
));
13421342

1343+
metering_context_container.enter();
1344+
1345+
chroma_metering::with_current(|context| {
1346+
context.start_request(Instant::now());
1347+
});
1348+
13431349
tracing::info!(name: "collection_add", tenant_name = %tenant, database_name = %database, collection_id = %collection_id, num_ids = %payload.ids.len());
13441350
let request = chroma_types::AddCollectionRecordsRequest::try_new(
13451351
tenant,
@@ -1443,6 +1449,12 @@ async fn collection_update(
14431449
WriteAction::Update,
14441450
));
14451451

1452+
metering_context_container.enter();
1453+
1454+
chroma_metering::with_current(|context| {
1455+
context.start_request(Instant::now());
1456+
});
1457+
14461458
tracing::info!(name: "collection_update", tenant_name = %tenant, database_name = %database, collection_id = %collection_id, num_ids = %payload.ids.len());
14471459
let request = chroma_types::UpdateCollectionRecordsRequest::try_new(
14481460
tenant,
@@ -1553,6 +1565,12 @@ async fn collection_upsert(
15531565
WriteAction::Upsert,
15541566
));
15551567

1568+
metering_context_container.enter();
1569+
1570+
chroma_metering::with_current(|context| {
1571+
context.start_request(Instant::now());
1572+
});
1573+
15561574
tracing::info!(name: "collection_upsert", tenant_name = %tenant, database_name = %database, collection_id = %collection_id, num_ids = %payload.ids.len());
15571575
let request = chroma_types::UpsertCollectionRecordsRequest::try_new(
15581576
tenant,
@@ -1834,6 +1852,12 @@ async fn collection_get(
18341852
ReadAction::Get,
18351853
));
18361854

1855+
metering_context_container.enter();
1856+
1857+
chroma_metering::with_current(|context| {
1858+
context.start_request(Instant::now());
1859+
});
1860+
18371861
tracing::info!(
18381862
name: "collection_get",
18391863
num_ids = payload.ids.as_ref().map_or(0, |ids| ids.len()),
@@ -1888,6 +1912,7 @@ pub struct QueryRequestPayload {
18881912
("offset" = Option<u32>, Query, description = "Offset for pagination")
18891913
)
18901914
)]
1915+
18911916
async fn collection_query(
18921917
headers: HeaderMap,
18931918
Path((tenant, database, collection_id)): Path<(String, String, String)>,
@@ -1950,6 +1975,12 @@ async fn collection_query(
19501975
ReadAction::Query,
19511976
));
19521977

1978+
metering_context_container.enter();
1979+
1980+
chroma_metering::with_current(|context| {
1981+
context.start_request(Instant::now());
1982+
});
1983+
19531984
tracing::info!(
19541985
name: "collection_query",
19551986
num_ids = payload.ids.as_ref().map_or(0, |ids| ids.len()),

rust/metering-macros/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ pub fn initialize_metering(raw_token_stream: proc_macro::TokenStream) -> proc_ma
9090
ReceiverAlreadyInitializedError,
9191
#[error("Failed to downcast context to provided type")]
9292
DowncastError,
93+
#[error("RwLock poisoned when attempting to read or write")]
94+
RwLockPoisonedError,
9395
}
9496

9597
/// A type alias for a shared, boxed, metering context

0 commit comments

Comments
 (0)