Skip to content

Commit ee3efe0

Browse files
authored
feat(metrics): add in memory metrics exporter (#1017)
* feat(metrics): add in memory metrics exporter * don't run examples * fix: move in memory exporter behind testing feature flag * fix: format
1 parent 3597479 commit ee3efe0

File tree

5 files changed

+358
-0
lines changed

5 files changed

+358
-0
lines changed

opentelemetry-sdk/src/metrics/data/mod.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,18 @@ pub struct DataPoint<T> {
100100
pub exemplars: Vec<Exemplar<T>>,
101101
}
102102

103+
impl<T: Copy> Clone for DataPoint<T> {
104+
fn clone(&self) -> Self {
105+
Self {
106+
attributes: self.attributes.clone(),
107+
start_time: self.start_time,
108+
time: self.time,
109+
value: self.value,
110+
exemplars: self.exemplars.clone(),
111+
}
112+
}
113+
}
114+
103115
/// Represents the histogram of all measurements of values from an instrument.
104116
#[derive(Debug)]
105117
pub struct Histogram<T> {
@@ -146,6 +158,23 @@ pub struct HistogramDataPoint<T> {
146158
pub exemplars: Vec<Exemplar<T>>,
147159
}
148160

161+
impl<T: Copy> Clone for HistogramDataPoint<T> {
162+
fn clone(&self) -> Self {
163+
Self {
164+
attributes: self.attributes.clone(),
165+
start_time: self.start_time,
166+
time: self.time,
167+
count: self.count,
168+
bounds: self.bounds.clone(),
169+
bucket_counts: self.bucket_counts.clone(),
170+
min: self.min,
171+
max: self.max,
172+
sum: self.sum,
173+
exemplars: self.exemplars.clone(),
174+
}
175+
}
176+
}
177+
149178
/// A measurement sampled from a time series providing a typical example.
150179
#[derive(Debug)]
151180
pub struct Exemplar<T> {
@@ -165,3 +194,15 @@ pub struct Exemplar<T> {
165194
/// If no span was active or the span was not sampled this will be empty.
166195
pub trace_id: [u8; 16],
167196
}
197+
198+
impl<T: Copy> Clone for Exemplar<T> {
199+
fn clone(&self) -> Self {
200+
Self {
201+
filtered_attributes: self.filtered_attributes.clone(),
202+
time: self.time,
203+
value: self.value,
204+
span_id: self.span_id,
205+
trace_id: self.trace_id,
206+
}
207+
}
208+
}

opentelemetry-sdk/src/metrics/exporter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Interfaces for exporting metrics
22
use async_trait::async_trait;
3+
34
use opentelemetry_api::metrics::Result;
45

56
use crate::metrics::{
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics, Temporality};
2+
use crate::metrics::exporter::PushMetricsExporter;
3+
use crate::metrics::reader::{
4+
AggregationSelector, DefaultAggregationSelector, DefaultTemporalitySelector,
5+
TemporalitySelector,
6+
};
7+
use crate::metrics::{data, Aggregation, InstrumentKind};
8+
use async_trait::async_trait;
9+
use opentelemetry_api::metrics::MetricsError;
10+
use opentelemetry_api::metrics::Result;
11+
use std::any::Any;
12+
use std::collections::VecDeque;
13+
use std::fmt;
14+
use std::sync::{Arc, Mutex};
15+
16+
/// An in-memory metrics exporter that stores metrics data in memory.
17+
///
18+
/// This exporter is useful for testing and debugging purposes. It stores
19+
/// metric data in a `VecDeque<ResourceMetrics>`. Metrics can be retrieved
20+
/// using the `get_finished_metrics` method.
21+
///
22+
/// # Panics
23+
///
24+
/// This exporter may panic
25+
/// - if there's an issue with locking the `metrics` Mutex, such as if the Mutex is poisoned.
26+
/// - the data point recorded is not one of [i64, u64, f64]. This shouldn't happen if used with OpenTelemetry API.
27+
///
28+
/// # Example
29+
///
30+
/// ```no_run
31+
///# use opentelemetry_sdk::{metrics, runtime};
32+
///# use opentelemetry_api::{Context, KeyValue};
33+
///# use opentelemetry_api::metrics::MeterProvider;
34+
///# use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
35+
///# use opentelemetry_sdk::metrics::PeriodicReader;
36+
///
37+
///# #[tokio::main]
38+
///# async fn main() {
39+
/// // Create an InMemoryMetricsExporter
40+
/// let exporter = InMemoryMetricsExporter::default();
41+
///
42+
/// // Create a MeterProvider and register the exporter
43+
/// let meter_provider = metrics::MeterProvider::builder()
44+
/// .with_reader(PeriodicReader::builder(exporter.clone(), runtime::Tokio).build())
45+
/// .build();
46+
///
47+
/// // Create and record metrics using the MeterProvider
48+
/// let meter = meter_provider.meter("example");
49+
/// let cx = Context::new();
50+
/// let counter = meter.u64_counter("my_counter").init();
51+
/// counter.add(&cx, 1, &[KeyValue::new("key", "value")]);
52+
///
53+
/// meter_provider.force_flush(&cx).unwrap();
54+
///
55+
/// // Retrieve the finished metrics from the exporter
56+
/// let finished_metrics = exporter.get_finished_metrics().unwrap();
57+
///
58+
/// // Print the finished metrics
59+
/// for resource_metrics in finished_metrics {
60+
/// println!("{:?}", resource_metrics);
61+
/// }
62+
///# }
63+
/// ```
64+
pub struct InMemoryMetricsExporter {
65+
metrics: Arc<Mutex<VecDeque<ResourceMetrics>>>,
66+
aggregation_selector: Arc<dyn AggregationSelector + Send + Sync>,
67+
temporality_selector: Arc<dyn TemporalitySelector + Send + Sync>,
68+
}
69+
70+
impl Clone for InMemoryMetricsExporter {
71+
fn clone(&self) -> Self {
72+
InMemoryMetricsExporter {
73+
metrics: self.metrics.clone(),
74+
aggregation_selector: self.aggregation_selector.clone(),
75+
temporality_selector: self.temporality_selector.clone(),
76+
}
77+
}
78+
}
79+
80+
impl fmt::Debug for InMemoryMetricsExporter {
81+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82+
f.debug_struct("InMemoryMetricsExporter").finish()
83+
}
84+
}
85+
86+
impl Default for InMemoryMetricsExporter {
87+
fn default() -> Self {
88+
InMemoryMetricsExporterBuilder::new().build()
89+
}
90+
}
91+
92+
/// Builder for [`InMemoryMetricsExporter`].
93+
/// # Example
94+
///
95+
/// ```
96+
/// # use opentelemetry_sdk::testing::metrics::{InMemoryMetricsExporter, InMemoryMetricsExporterBuilder};
97+
///
98+
/// let exporter = InMemoryMetricsExporterBuilder::new().build();
99+
/// ```
100+
pub struct InMemoryMetricsExporterBuilder {
101+
aggregation_selector: Option<Arc<dyn AggregationSelector + Send + Sync>>,
102+
temporality_selector: Option<Arc<dyn TemporalitySelector + Send + Sync>>,
103+
}
104+
105+
impl fmt::Debug for InMemoryMetricsExporterBuilder {
106+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107+
f.debug_struct("InMemoryMetricsExporterBuilder").finish()
108+
}
109+
}
110+
111+
impl Default for InMemoryMetricsExporterBuilder {
112+
fn default() -> Self {
113+
Self::new()
114+
}
115+
}
116+
117+
impl InMemoryMetricsExporterBuilder {
118+
/// Creates a new instance of the `InMemoryMetricsExporterBuilder`.
119+
pub fn new() -> Self {
120+
Self {
121+
aggregation_selector: None,
122+
temporality_selector: None,
123+
}
124+
}
125+
126+
/// Sets the aggregation selector for the exporter.
127+
pub fn with_aggregation_selector<T>(mut self, aggregation_selector: T) -> Self
128+
where
129+
T: AggregationSelector + Send + Sync + 'static,
130+
{
131+
self.aggregation_selector = Some(Arc::new(aggregation_selector));
132+
self
133+
}
134+
135+
/// Sets the temporality selector for the exporter.
136+
pub fn with_temporality_selector<T>(mut self, temporality_selector: T) -> Self
137+
where
138+
T: TemporalitySelector + Send + Sync + 'static,
139+
{
140+
self.temporality_selector = Some(Arc::new(temporality_selector));
141+
self
142+
}
143+
144+
/// Creates a new instance of the `InMemoryMetricsExporter`.
145+
///
146+
pub fn build(self) -> InMemoryMetricsExporter {
147+
InMemoryMetricsExporter {
148+
metrics: Arc::new(Mutex::new(VecDeque::new())),
149+
aggregation_selector: self
150+
.aggregation_selector
151+
.unwrap_or_else(|| Arc::new(DefaultAggregationSelector::default())),
152+
temporality_selector: self
153+
.temporality_selector
154+
.unwrap_or_else(|| Arc::new(DefaultTemporalitySelector::default())),
155+
}
156+
}
157+
}
158+
159+
impl InMemoryMetricsExporter {
160+
/// Returns the finished metrics as a vector of `ResourceMetrics`.
161+
///
162+
/// # Errors
163+
///
164+
/// Returns a `MetricsError` if the internal lock cannot be acquired.
165+
///
166+
/// # Example
167+
///
168+
/// ```
169+
/// # use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
170+
///
171+
/// let exporter = InMemoryMetricsExporter::default();
172+
/// let finished_metrics = exporter.get_finished_metrics().unwrap();
173+
/// ```
174+
pub fn get_finished_metrics(&self) -> Result<Vec<ResourceMetrics>> {
175+
self.metrics
176+
.lock()
177+
.map(|metrics_guard| metrics_guard.iter().map(Self::clone_metrics).collect())
178+
.map_err(MetricsError::from)
179+
}
180+
181+
/// Clears the internal storage of finished metrics.
182+
///
183+
/// # Example
184+
///
185+
/// ```
186+
/// # use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
187+
///
188+
/// let exporter = InMemoryMetricsExporter::default();
189+
/// exporter.reset();
190+
/// ```
191+
pub fn reset(&self) {
192+
let _ = self
193+
.metrics
194+
.lock()
195+
.map(|mut metrics_guard| metrics_guard.clear());
196+
}
197+
198+
fn clone_metrics(metric: &ResourceMetrics) -> ResourceMetrics {
199+
ResourceMetrics {
200+
resource: metric.resource.clone(),
201+
scope_metrics: metric
202+
.scope_metrics
203+
.iter()
204+
.map(|scope_metric| ScopeMetrics {
205+
scope: scope_metric.scope.clone(),
206+
metrics: scope_metric
207+
.metrics
208+
.iter()
209+
.map(|metric| Metric {
210+
name: metric.name.clone(),
211+
description: metric.description.clone(),
212+
unit: metric.unit.clone(),
213+
// we don't expect any unknown data type here
214+
data: Self::clone_data(&metric.data).unwrap(),
215+
})
216+
.collect(),
217+
})
218+
.collect(),
219+
}
220+
}
221+
222+
fn clone_data(data: &dyn Any) -> Option<Box<dyn data::Aggregation>> {
223+
if let Some(hist) = data.downcast_ref::<Histogram<i64>>() {
224+
Some(Box::new(Histogram {
225+
data_points: hist.data_points.clone(),
226+
temporality: hist.temporality,
227+
}))
228+
} else if let Some(hist) = data.downcast_ref::<Histogram<f64>>() {
229+
Some(Box::new(Histogram {
230+
data_points: hist.data_points.clone(),
231+
temporality: hist.temporality,
232+
}))
233+
} else if let Some(hist) = data.downcast_ref::<Histogram<u64>>() {
234+
Some(Box::new(Histogram {
235+
data_points: hist.data_points.clone(),
236+
temporality: hist.temporality,
237+
}))
238+
} else if let Some(sum) = data.downcast_ref::<data::Sum<i64>>() {
239+
Some(Box::new(data::Sum {
240+
data_points: sum.data_points.clone(),
241+
temporality: sum.temporality,
242+
is_monotonic: sum.is_monotonic,
243+
}))
244+
} else if let Some(sum) = data.downcast_ref::<data::Sum<f64>>() {
245+
Some(Box::new(data::Sum {
246+
data_points: sum.data_points.clone(),
247+
temporality: sum.temporality,
248+
is_monotonic: sum.is_monotonic,
249+
}))
250+
} else if let Some(sum) = data.downcast_ref::<data::Sum<u64>>() {
251+
Some(Box::new(data::Sum {
252+
data_points: sum.data_points.clone(),
253+
temporality: sum.temporality,
254+
is_monotonic: sum.is_monotonic,
255+
}))
256+
} else if let Some(gauge) = data.downcast_ref::<data::Gauge<i64>>() {
257+
Some(Box::new(data::Gauge {
258+
data_points: gauge.data_points.clone(),
259+
}))
260+
} else if let Some(gauge) = data.downcast_ref::<data::Gauge<f64>>() {
261+
Some(Box::new(data::Gauge {
262+
data_points: gauge.data_points.clone(),
263+
}))
264+
} else if let Some(gauge) = data.downcast_ref::<data::Gauge<u64>>() {
265+
Some(Box::new(data::Gauge {
266+
data_points: gauge.data_points.clone(),
267+
}))
268+
} else {
269+
// unknown data type
270+
None
271+
}
272+
}
273+
}
274+
275+
impl AggregationSelector for InMemoryMetricsExporter {
276+
fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
277+
self.aggregation_selector.aggregation(kind)
278+
}
279+
}
280+
281+
impl TemporalitySelector for InMemoryMetricsExporter {
282+
fn temporality(&self, kind: InstrumentKind) -> Temporality {
283+
self.temporality_selector.temporality(kind)
284+
}
285+
}
286+
287+
#[async_trait]
288+
impl PushMetricsExporter for InMemoryMetricsExporter {
289+
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
290+
self.metrics
291+
.lock()
292+
.map(|mut metrics_guard| {
293+
metrics_guard.push_back(InMemoryMetricsExporter::clone_metrics(metrics))
294+
})
295+
.map_err(MetricsError::from)
296+
}
297+
298+
async fn force_flush(&self) -> Result<()> {
299+
Ok(()) // In this implementation, flush does nothing
300+
}
301+
302+
async fn shutdown(&self) -> Result<()> {
303+
self.metrics
304+
.lock()
305+
.map(|mut metrics_guard| metrics_guard.clear())
306+
.map_err(MetricsError::from)?;
307+
308+
Ok(())
309+
}
310+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub use in_memory_exporter::{InMemoryMetricsExporter, InMemoryMetricsExporterBuilder};
2+
3+
mod in_memory_exporter;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
#[cfg(all(feature = "testing", feature = "trace"))]
22
pub mod trace;
3+
4+
#[cfg(all(feature = "testing", feature = "metrics"))]
5+
pub mod metrics;

0 commit comments

Comments
 (0)