Skip to content

Commit 8161297

Browse files
committed
[ENH]: add metrics for span & metric exporters
1 parent 73abfdc commit 8161297

File tree

6 files changed

+185
-9
lines changed

6 files changed

+185
-9
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/tracing/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ tracing-opentelemetry = { workspace = true }
1818
tracing-subscriber = { workspace = true }
1919
tracing = { workspace = true }
2020
uuid = { workspace = true }
21+
futures = { workspace = true }
22+
async-trait = { workspace = true }
23+
tokio = { workspace = true }
2124

2225
chroma-system = { workspace = true }
2326

rust/tracing/src/init_tracer.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// load/src/opentelemetry_config.rs file
33
// Keep them in-sync manually.
44

5+
use crate::wrapped_metric_exporter::WrappedMetricExporter;
6+
use crate::wrapped_span_exporter::WrappedSpanExporter;
57
use opentelemetry::trace::TracerProvider;
68
use opentelemetry::{global, InstrumentationScope};
79
use opentelemetry_otlp::WithExportConfig;
@@ -107,22 +109,25 @@ pub fn init_otel_layer(
107109
]);
108110

109111
// Prepare tracer.
110-
let tracing_span_exporter = opentelemetry_otlp::SpanExporter::builder()
112+
let tracing_span_exporter: WrappedSpanExporter<_> = opentelemetry_otlp::SpanExporter::builder()
111113
.with_tonic()
112114
.with_endpoint(otel_endpoint)
113115
.build()
114-
.expect("could not build span exporter for tracing");
116+
.expect("could not build span exporter for tracing")
117+
.into();
115118
let trace_config = opentelemetry_sdk::trace::Config::default().with_resource(resource.clone());
116119
let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
117120
.with_batch_exporter(tracing_span_exporter, opentelemetry_sdk::runtime::Tokio)
118121
.with_config(trace_config)
119122
.build();
120123
let tracer = tracer_provider.tracer(service_name.clone());
121-
let fastrace_span_exporter = opentelemetry_otlp::SpanExporter::builder()
122-
.with_tonic()
123-
.with_endpoint(otel_endpoint)
124-
.build()
125-
.expect("could not build span exporter for fastrace");
124+
let fastrace_span_exporter: WrappedSpanExporter<_> =
125+
opentelemetry_otlp::SpanExporter::builder()
126+
.with_tonic()
127+
.with_endpoint(otel_endpoint)
128+
.build()
129+
.expect("could not build span exporter for fastrace")
130+
.into();
126131
fastrace::set_reporter(
127132
fastrace_opentelemetry::OpenTelemetryReporter::new(
128133
fastrace_span_exporter,
@@ -134,13 +139,14 @@ pub fn init_otel_layer(
134139
);
135140

136141
// Prepare meter.
137-
let metric_exporter = opentelemetry_otlp::MetricExporter::builder()
142+
let metric_exporter: WrappedMetricExporter<_> = opentelemetry_otlp::MetricExporter::builder()
138143
.with_tonic()
139144
.with_endpoint(
140145
std::env::var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT").unwrap_or(otel_endpoint.clone()),
141146
)
142147
.build()
143-
.expect("could not build metric exporter");
148+
.expect("could not build metric exporter")
149+
.into();
144150

145151
let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(
146152
metric_exporter,

rust/tracing/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
pub mod grpc_tower;
33
pub mod init_tracer;
44
pub mod util;
5+
mod wrapped_metric_exporter;
6+
mod wrapped_span_exporter;
57

68
#[cfg(feature = "grpc")]
79
pub use grpc_tower::*;
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use opentelemetry::{global, metrics::Counter, KeyValue};
2+
use opentelemetry_sdk::metrics::{
3+
data::ResourceMetrics, exporter::PushMetricExporter, MetricResult,
4+
};
5+
use std::sync::LazyLock;
6+
7+
/// A wrapper around an OpenTelemetry MetricExporter that counts the number of export calls & tracks errors.
8+
#[derive(Debug)]
9+
pub struct WrappedMetricExporter<E> {
10+
inner: E,
11+
counter: LazyLock<Counter<u64>>,
12+
}
13+
14+
impl<E> WrappedMetricExporter<E> {
15+
pub fn new(inner: E) -> Self {
16+
Self {
17+
inner,
18+
// We use a LazyLock so that the counter is only initialized after the global exporter is set up.
19+
counter: LazyLock::new(|| {
20+
global::meter("chroma.tracing")
21+
.u64_counter("metric_exporter_calls")
22+
.with_description("Counts the number of metric exporter calls")
23+
.build()
24+
}),
25+
}
26+
}
27+
}
28+
29+
impl From<opentelemetry_otlp::MetricExporter>
30+
for WrappedMetricExporter<opentelemetry_otlp::MetricExporter>
31+
{
32+
fn from(exporter: opentelemetry_otlp::MetricExporter) -> Self {
33+
WrappedMetricExporter::new(exporter)
34+
}
35+
}
36+
37+
#[async_trait::async_trait]
38+
impl<E> PushMetricExporter for WrappedMetricExporter<E>
39+
where
40+
E: PushMetricExporter + Send + Sync + 'static,
41+
{
42+
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
43+
let counter = self.counter.clone();
44+
println!("Exporting metrics: {:?}", metrics);
45+
let result = self.inner.export(metrics).await;
46+
match &result {
47+
Ok(_) => counter.add(1, &[KeyValue::new("status", "success")]),
48+
Err(err) => {
49+
let error_name = match err {
50+
opentelemetry_sdk::metrics::MetricError::Other(_) => "other",
51+
opentelemetry_sdk::metrics::MetricError::Config(_) => "config",
52+
opentelemetry_sdk::metrics::MetricError::ExportErr(_) => "export_failed",
53+
opentelemetry_sdk::metrics::MetricError::InvalidInstrumentConfiguration(_) => {
54+
"invalid_instrument_configuration"
55+
}
56+
_ => "unknown",
57+
};
58+
counter.add(
59+
1,
60+
&[
61+
KeyValue::new("status", "error"),
62+
KeyValue::new("error", error_name.to_string()),
63+
],
64+
);
65+
}
66+
}
67+
result
68+
}
69+
70+
async fn force_flush(&self) -> MetricResult<()> {
71+
self.inner.force_flush().await
72+
}
73+
74+
fn shutdown(&self) -> MetricResult<()> {
75+
self.inner.shutdown()
76+
}
77+
78+
fn temporality(&self) -> opentelemetry_sdk::metrics::Temporality {
79+
self.inner.temporality()
80+
}
81+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use futures::future::BoxFuture;
2+
use opentelemetry::{global, metrics::Counter, KeyValue};
3+
use opentelemetry_sdk::export::trace::SpanExporter;
4+
use std::sync::{Arc, LazyLock};
5+
use tokio::sync::Mutex;
6+
7+
/// A wrapper around an OpenTelemetry SpanExporter that counts the number of export calls & tracks errors.
8+
#[derive(Debug)]
9+
pub struct WrappedSpanExporter<E> {
10+
// The documentation for the SpanExporter trait specifies that "This function will never be called concurrently for the same exporter instance" so using a Mutex should not limit performance.
11+
inner: Arc<Mutex<E>>,
12+
// We use a LazyLock so that the counter is only initialized after the global exporter is set up.
13+
counter: LazyLock<Counter<u64>>,
14+
}
15+
16+
impl From<opentelemetry_otlp::SpanExporter>
17+
for WrappedSpanExporter<opentelemetry_otlp::SpanExporter>
18+
{
19+
fn from(exporter: opentelemetry_otlp::SpanExporter) -> Self {
20+
WrappedSpanExporter::new(exporter)
21+
}
22+
}
23+
24+
impl<E> WrappedSpanExporter<E> {
25+
pub fn new(inner: E) -> Self {
26+
Self {
27+
inner: Arc::new(Mutex::new(inner)),
28+
counter: LazyLock::new(|| {
29+
global::meter("chroma.tracing")
30+
.u64_counter("span_exporter_calls")
31+
.with_description("Counts the number of span exporter calls")
32+
.build()
33+
}),
34+
}
35+
}
36+
}
37+
38+
impl<E> SpanExporter for WrappedSpanExporter<E>
39+
where
40+
E: SpanExporter + Send + Sync + 'static,
41+
{
42+
fn export(
43+
&mut self,
44+
batch: Vec<opentelemetry_sdk::export::trace::SpanData>,
45+
) -> BoxFuture<'static, opentelemetry_sdk::export::trace::ExportResult> {
46+
let exporter = self.inner.clone();
47+
let counter = self.counter.clone();
48+
Box::pin(async move {
49+
let mut exporter = exporter.lock().await;
50+
let result = exporter.export(batch).await;
51+
match &result {
52+
Ok(_) => counter.add(1, &[KeyValue::new("status", "success")]),
53+
Err(err) => {
54+
println!("caught error in span exporter: {:?}", err);
55+
let error_name = match err {
56+
opentelemetry::trace::TraceError::ExportFailed(_) => "export_failed",
57+
opentelemetry::trace::TraceError::ExportTimedOut(_) => "timeout",
58+
opentelemetry::trace::TraceError::TracerProviderAlreadyShutdown => {
59+
"shutdown"
60+
}
61+
opentelemetry::trace::TraceError::Other(_) => "other",
62+
_ => "unknown",
63+
};
64+
counter.add(
65+
1,
66+
&[
67+
KeyValue::new("status", "error"),
68+
KeyValue::new("error", error_name.to_string()),
69+
],
70+
);
71+
}
72+
}
73+
74+
result
75+
})
76+
}
77+
78+
fn shutdown(&mut self) {
79+
self.inner.blocking_lock().shutdown()
80+
}
81+
}

0 commit comments

Comments
 (0)