diff --git a/Cargo.lock b/Cargo.lock index 2d7fae78340..f41eed12084 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2781,12 +2781,16 @@ version = "0.1.0" dependencies = [ "futures", "futures-util", + "num", "observability_deps", "parking_lot", + "proptest", "reqwest 0.11.27", "serde", "serde_json", + "sysinfo 0.30.13", "test-log", + "thiserror", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index cbceb83f548..83afaf932f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ pretty_assertions = "1.4.0" prost = "0.12.6" prost-build = "0.12.6" prost-types = "0.12.6" +proptest = { version = "1", default-features = false, features = ["std"] } rand = "0.8.5" reqwest = { version = "0.11.24", default-features = false, features = ["rustls-tls", "stream", "json"] } secrecy = "0.8.0" @@ -110,6 +111,7 @@ unicode-segmentation = "1.11.0" url = "2.5.0" urlencoding = "1.1" uuid = { version = "1", features = ["v4"] } +num = { version = "0.4.3" } # Core.git crates we depend on arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "1eaa4ed5ea147bc24db98d9686e457c124dfd5b7"} diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index c0ca0260d22..abfb9a6a19a 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -2,7 +2,7 @@ use clap_blocks::{ memory_size::MemorySize, - object_store::{make_object_store, ObjectStoreConfig}, + object_store::{make_object_store, ObjectStoreConfig, ObjectStoreType}, socket_addr::SocketAddr, tokio::TokioDatafusionConfig, }; @@ -14,6 +14,7 @@ use influxdb3_server::{ auth::AllOrNothingAuthorizer, builder::ServerBuilder, query_executor::QueryExecutorImpl, serve, CommonServerState, }; +use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_write::{ last_cache::LastCacheProvider, parquet_cache::create_cached_obj_store_and_oracle, @@ -329,8 +330,11 @@ pub async fn command(config: Config) -> Result<()> { let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner()) .map_err(Error::InitializeLastCache)?; - info!(instance_id = ?catalog.instance_id(), "Catalog initialized with"); + + let _telemetry_store = + setup_telemetry_store(&config.object_store_config, catalog.instance_id(), num_cpus).await; + let write_buffer: Arc = Arc::new( WriteBufferImpl::new( Arc::clone(&persister), @@ -378,6 +382,31 @@ pub async fn command(config: Config) -> Result<()> { Ok(()) } +async fn setup_telemetry_store( + object_store_config: &ObjectStoreConfig, + instance_id: Arc, + num_cpus: usize, +) -> Arc { + let os = std::env::consts::OS; + let influxdb_pkg_version = env!("CARGO_PKG_VERSION"); + let influxdb_pkg_name = env!("CARGO_PKG_NAME"); + // Following should show influxdb3-0.1.0 + let influx_version = format!("{}-{}", influxdb_pkg_name, influxdb_pkg_version); + let obj_store_type = object_store_config + .object_store + .unwrap_or(ObjectStoreType::Memory); + let storage_type = obj_store_type.as_str(); + + TelemetryStore::new( + instance_id, + Arc::from(os), + Arc::from(influx_version), + Arc::from(storage_type), + num_cpus, + ) + .await +} + fn parse_datafusion_config( s: &str, ) -> Result, Box> { diff --git a/influxdb3_telemetry/Cargo.toml b/influxdb3_telemetry/Cargo.toml index 3970de849c1..f5e14dc3f0c 100644 --- a/influxdb3_telemetry/Cargo.toml +++ b/influxdb3_telemetry/Cargo.toml @@ -14,7 +14,11 @@ futures.workspace = true futures-util.workspace = true reqwest.workspace = true parking_lot.workspace = true +sysinfo.workspace = true +num.workspace = true +thiserror.workspace = true [dev-dependencies] test-log.workspace = true +proptest.workspace = true diff --git a/influxdb3_telemetry/src/cpu_mem_sampler.rs b/influxdb3_telemetry/src/cpu_mem_sampler.rs new file mode 100644 index 00000000000..ae7a2932504 --- /dev/null +++ b/influxdb3_telemetry/src/cpu_mem_sampler.rs @@ -0,0 +1,63 @@ +use std::{sync::Arc, time::Duration}; + +use observability_deps::tracing::debug; +use sysinfo::{ProcessRefreshKind, System}; + +use crate::Result; +use crate::{store::TelemetryStore, TelemetryError}; + +struct CpuAndMemorySampler { + system: System, +} + +impl CpuAndMemorySampler { + pub fn new(system: System) -> Self { + Self { system } + } + + pub fn get_cpu_and_mem_used(&mut self) -> Result<(f32, u64)> { + let pid = sysinfo::get_current_pid().map_err(TelemetryError::CannotGetPid)?; + self.system.refresh_pids_specifics( + &[pid], + ProcessRefreshKind::new() + .with_cpu() + .with_memory() + .with_disk_usage(), + ); + + let process = self + .system + .process(pid) + .unwrap_or_else(|| panic!("cannot get process with pid: {}", pid)); + + let memory_used = process.memory(); + let cpu_used = process.cpu_usage(); + + debug!( + mem_used = ?memory_used, + cpu_used = ?cpu_used, + "trying to sample data for cpu/memory"); + + Ok((cpu_used, memory_used)) + } +} + +pub(crate) async fn sample_cpu_and_memory( + store: Arc, + duration_secs: Duration, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut sampler = CpuAndMemorySampler::new(System::new()); + + // sample every minute + let mut interval = tokio::time::interval(duration_secs); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + if let Ok((cpu_used, memory_used)) = sampler.get_cpu_and_mem_used() { + store.add_cpu_and_memory(cpu_used, memory_used); + } + } + }) +} diff --git a/influxdb3_telemetry/src/lib.rs b/influxdb3_telemetry/src/lib.rs index 19dd0594f3d..2e4bb3bfa34 100644 --- a/influxdb3_telemetry/src/lib.rs +++ b/influxdb3_telemetry/src/lib.rs @@ -1,142 +1,20 @@ -use observability_deps::tracing::error; -use serde::Serialize; -use std::{sync::Arc, time::Duration}; +mod cpu_mem_sampler; +mod sender; +mod stats; +pub mod store; -/// This store is responsible for holding all the stats which -/// will be sent in the background to the server. -pub struct TelemetryStore { - inner: parking_lot::Mutex, -} - -impl TelemetryStore { - pub async fn new( - instance_id: String, - os: String, - influx_version: String, - storage_type: String, - cores: u32, - ) -> Arc { - let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores); - let store = Arc::new(TelemetryStore { - inner: parking_lot::Mutex::new(inner), - }); - send_telemetry_in_background(store.clone()).await; - store - } - - pub fn add_cpu_utilization(&self, value: u32) { - let mut inner_store = self.inner.lock(); - inner_store.cpu_utilization_percent = Some(value); - } - - pub fn snapshot(&self) -> ExternalTelemetry { - let inner_store = self.inner.lock(); - inner_store.snapshot() - } -} - -struct TelemetryStoreInner { - instance_id: String, - os: String, - influx_version: String, - storage_type: String, - cores: u32, - // just for explanation - cpu_utilization_percent: Option, -} +use thiserror::Error; -impl TelemetryStoreInner { - pub fn new( - instance_id: String, - os: String, - influx_version: String, - storage_type: String, - cores: u32, - ) -> Self { - TelemetryStoreInner { - os, - instance_id, - influx_version, - storage_type, - cores, - cpu_utilization_percent: None, - } - } +#[derive(Debug, Error)] +pub enum TelemetryError { + #[error("cannot serialize to JSON: {0}")] + CannotSerializeJson(#[from] serde_json::Error), - pub fn snapshot(&self) -> ExternalTelemetry { - ExternalTelemetry { - os: self.os.clone(), - version: self.influx_version.clone(), - instance_id: self.instance_id.clone(), - storage_type: self.storage_type.clone(), - cores: self.cores, - cpu_utilization_percent: self.cpu_utilization_percent, - } - } -} + #[error("failed to get pid: {0}")] + CannotGetPid(&'static str), -#[derive(Serialize)] -pub struct ExternalTelemetry { - pub os: String, - pub version: String, - pub storage_type: String, - pub instance_id: String, - pub cores: u32, - pub cpu_utilization_percent: Option, + #[error("cannot send telemetry: {0}")] + CannotSendToTelemetryServer(#[from] reqwest::Error), } -async fn send_telemetry_in_background(store: Arc) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - let client = reqwest::Client::new(); - // TODO: Pass in the duration rather than hardcode it to 1hr - let mut interval = tokio::time::interval(Duration::from_secs(60 * 60)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - loop { - interval.tick().await; - let telemetry = store.snapshot(); - let maybe_json = serde_json::to_vec(&telemetry); - match maybe_json { - Ok(json) => { - // TODO: wire it up to actual telemetry sender - let _res = client - .post("https://telemetry.influxdata.endpoint.com") - .body(json) - .send() - .await; - } - Err(e) => { - error!(error = ?e, "Cannot send telemetry"); - } - } - } - }) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test_log::test(tokio::test)] - async fn test_telemetry_handle_creation() { - // create store - let store: Arc = TelemetryStore::new( - "some-instance-id".to_owned(), - "Linux".to_owned(), - "OSS-v3.0".to_owned(), - "Memory".to_owned(), - 10, - ) - .await; - // check snapshot - let snapshot = store.snapshot(); - assert_eq!("some-instance-id", snapshot.instance_id); - - // add cpu utilization - store.add_cpu_utilization(89); - - // check snapshot again - let snapshot = store.snapshot(); - assert_eq!(Some(89), snapshot.cpu_utilization_percent); - } -} +pub type Result = std::result::Result; diff --git a/influxdb3_telemetry/src/sender.rs b/influxdb3_telemetry/src/sender.rs new file mode 100644 index 00000000000..11695e76b6b --- /dev/null +++ b/influxdb3_telemetry/src/sender.rs @@ -0,0 +1,55 @@ +use std::{sync::Arc, time::Duration}; + +use observability_deps::tracing::{debug, error}; + +use crate::store::{TelemetryPayload, TelemetryStore}; +use crate::{Result, TelemetryError}; + +pub(crate) struct TelemetrySender { + client: reqwest::Client, + req_path: String, +} + +impl TelemetrySender { + pub fn new(client: reqwest::Client, req_path: String) -> Self { + Self { client, req_path } + } + + pub async fn try_sending(&self, telemetry: &TelemetryPayload) -> Result<()> { + debug!(telemetry = ?telemetry, "trying to send data to telemetry server"); + let json = serde_json::to_vec(&telemetry).map_err(TelemetryError::CannotSerializeJson)?; + self.client + .post(self.req_path.as_str()) + .body(json) + .send() + .await + .map_err(TelemetryError::CannotSendToTelemetryServer)?; + debug!("Successfully sent telemetry data to server"); + Ok(()) + } +} + +pub(crate) async fn send_telemetry_in_background( + store: Arc, + duration_secs: Duration, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let telem_sender = TelemetrySender::new( + reqwest::Client::new(), + "https://telemetry.influxdata.foo.com".to_owned(), + ); + let mut interval = tokio::time::interval(duration_secs); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + let telemetry = store.snapshot(); + if let Err(e) = telem_sender.try_sending(&telemetry).await { + error!(error = ?e, "Cannot send telemetry"); + } + // if we tried sending and failed, we currently still reset the + // metrics, it is ok to miss few samples + store.reset_metrics(); + } + }) +} diff --git a/influxdb3_telemetry/src/stats.rs b/influxdb3_telemetry/src/stats.rs new file mode 100644 index 00000000000..80925fe5471 --- /dev/null +++ b/influxdb3_telemetry/src/stats.rs @@ -0,0 +1,147 @@ +use num::{Num, NumCast}; + +pub(crate) fn stats( + current_min: T, + current_max: T, + current_avg: T, + current_num_samples: u64, + new_value: T, +) -> Option<(T, T, T)> { + let min = min(current_min, new_value); + let max = max(current_max, new_value); + let avg = avg(current_num_samples, current_avg, new_value)?; + Some((min, max, avg)) +} + +/// Average function that returns average based on the type +/// provided. u64 for example will return avg as u64. This probably +/// is fine as we don't really need it to be a precise average. +/// For example, memory consumed measured in MB can be rounded as u64 +pub(crate) fn avg( + current_num_samples: u64, + current_avg: T, + new_value: T, +) -> Option { + // NB: num::cast(current_num_samples).unwrap() should have been enough, + // given we always reset metrics. However, if we decide to not reset + // metrics without retrying then it is better to bubble up the `Option` + // to indicate this cast did not work + let current_total = current_avg * num::cast(current_num_samples)?; + let new_total = current_total + new_value; + let new_num_samples = num::cast(current_num_samples.wrapping_add(1))?; + if new_num_samples == num::cast(0).unwrap() { + return None; + } + Some(new_total.div(new_num_samples)) +} + +fn min(current_min: T, new_value: T) -> T { + if new_value < current_min { + return new_value; + }; + current_min +} + +fn max(current_max: T, new_value: T) -> T { + if new_value > current_max { + return new_value; + }; + current_max +} + +#[cfg(test)] +mod tests { + use observability_deps::tracing::info; + use proptest::prelude::*; + + use super::*; + + #[test] + fn min_float_test() { + assert_eq!(1.0, min(1.0, 2.0)); + } + + #[test] + fn min_num_test() { + assert_eq!(1, min(1, 2)); + } + + #[test] + fn max_num_test() { + assert_eq!(2, max(1, 2)); + } + + #[test] + fn max_float_test() { + assert_eq!(2.0, max(1.0, 2.0)); + } + + #[test] + fn avg_num_test() { + assert_eq!(Some(2), avg(3, 2, 4)); + } + + #[test_log::test(test)] + fn avg_float_test() { + let avg_floats = avg(3, 2.0, 4.0); + info!(avg = ?avg_floats, "average float"); + assert_eq!(Some(2.5), avg_floats); + } + + #[test_log::test(test)] + fn avg_float_test_max() { + let avg_floats = avg(u64::MAX, 2.0, 4.0); + info!(avg = ?avg_floats, "average float"); + assert_eq!(None, avg_floats); + } + + #[test_log::test(test)] + fn avg_num_test_max() { + let avg_nums = avg(u64::MAX, 2, 4); + assert_eq!(None, avg_nums); + } + + #[test_log::test(test)] + fn stats_test() { + let stats = stats(2.0, 135.5, 25.5, 37, 25.0); + assert!(stats.is_some()); + let (min, max, avg) = stats.unwrap(); + info!(min = ?min, max = ?max, avg = ?avg, "stats >>"); + assert_eq!((2.0, 135.5, 25.486842105263158), (min, max, avg)); + } + + proptest! { + #[test_log::test(test)] + fn prop_test_stats_no_panic_u64( + min in 0u64..10000, + max in 0u64..10000, + curr_avg in 0u64..10000, + num_samples in 0u64..10000, + new_value in 0u64..100000, + ) { + stats(min, max, curr_avg, num_samples, new_value); + } + + #[test] + fn prop_test_stats_no_panic_f32( + min in 0.0f32..10000.0, + max in 0.0f32..10000.0, + curr_avg in 0.0f32..10000.0, + num_samples in 0u64..10000, + new_value in 0.0f32..100000.0, + ) { + stats(min, max, curr_avg, num_samples, new_value); + } + + #[test] + fn prop_test_stats_no_panic_f64( + min in 0.0f64..10000.0, + max in 0.0f64..10000.0, + curr_avg in 0.0f64..10000.0, + num_samples in 0u64..10000, + new_value in 0.0f64..100000.0, + ) { + stats(min, max, curr_avg, num_samples, new_value); + } + } +} diff --git a/influxdb3_telemetry/src/store.rs b/influxdb3_telemetry/src/store.rs new file mode 100644 index 00000000000..19e08abad00 --- /dev/null +++ b/influxdb3_telemetry/src/store.rs @@ -0,0 +1,280 @@ +use std::{sync::Arc, time::Duration}; + +use observability_deps::tracing::{debug, warn}; +use serde::Serialize; + +use crate::{ + cpu_mem_sampler::sample_cpu_and_memory, sender::send_telemetry_in_background, stats::stats, +}; + +/// This store is responsible for holding all the stats which +/// will be sent in the background to the server. +pub struct TelemetryStore { + inner: parking_lot::Mutex, +} + +const ONE_MIN_SECS: u64 = 60; +const ONE_HOUR_SECS: u64 = 60 * 60; + +impl TelemetryStore { + pub async fn new( + instance_id: Arc, + os: Arc, + influx_version: Arc, + storage_type: Arc, + cores: usize, + ) -> Arc { + debug!( + instance_id = ?instance_id, + os = ?os, + influx_version = ?influx_version, + storage_type = ?storage_type, + cores = ?cores, + "Initializing telemetry store" + ); + let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores); + let store = Arc::new(TelemetryStore { + inner: parking_lot::Mutex::new(inner), + }); + sample_cpu_and_memory(store.clone(), Duration::from_secs(ONE_MIN_SECS)).await; + send_telemetry_in_background(store.clone(), Duration::from_secs(ONE_HOUR_SECS)).await; + store + } + + pub fn add_cpu_and_memory(&self, cpu: f32, memory: u64) { + let mut inner_store = self.inner.lock(); + inner_store + .add_cpu_and_memory(cpu, memory) + .unwrap_or_else(|| { + // num::cast probably has had overflow. Best to + // reset all metrics to start again + warn!("cpu/memory could not be added, resetting metrics"); + inner_store.reset_metrics(); + }); + } + + pub fn reset_metrics(&self) { + let mut inner_store = self.inner.lock(); + inner_store.reset_metrics(); + } + + pub(crate) fn snapshot(&self) -> TelemetryPayload { + let inner_store = self.inner.lock(); + inner_store.snapshot() + } +} + +struct TelemetryStoreInner { + instance_id: Arc, + os: Arc, + influx_version: Arc, + storage_type: Arc, + cores: usize, + + cpu_utilization_percent_min: f64, + cpu_utilization_percent_max: f64, + cpu_utilization_percent_avg: f64, + + memory_used_mb_min: u64, + memory_used_mb_max: u64, + memory_used_mb_avg: u64, + + num_samples_cpu_mem: u64, +} + +impl TelemetryStoreInner { + pub fn new( + instance_id: Arc, + os: Arc, + influx_version: Arc, + storage_type: Arc, + cores: usize, + ) -> Self { + TelemetryStoreInner { + os, + instance_id, + influx_version, + storage_type, + cores, + + // cpu + cpu_utilization_percent_min: 0.0, + cpu_utilization_percent_max: 0.0, + cpu_utilization_percent_avg: 0.0, + + // mem + memory_used_mb_min: 0, + memory_used_mb_max: 0, + memory_used_mb_avg: 0, + + num_samples_cpu_mem: 0, + } + } + + pub(crate) fn snapshot(&self) -> TelemetryPayload { + TelemetryPayload { + os: self.os.clone(), + version: self.influx_version.clone(), + instance_id: self.instance_id.clone(), + storage_type: self.storage_type.clone(), + cores: self.cores, + product_type: "OSS", + + cpu_utilization_percent_min: self.cpu_utilization_percent_min, + cpu_utilization_percent_max: self.cpu_utilization_percent_max, + cpu_utilization_percent_avg: self.cpu_utilization_percent_avg, + + memory_used_mb_min: self.memory_used_mb_min, + memory_used_mb_max: self.memory_used_mb_max, + memory_used_mb_avg: self.memory_used_mb_avg, + } + } + + fn reset_metrics(&mut self) { + self.cpu_utilization_percent_min = 0.0; + self.cpu_utilization_percent_max = 0.0; + self.cpu_utilization_percent_avg = 0.0; + + self.memory_used_mb_min = 0; + self.memory_used_mb_max = 0; + self.memory_used_mb_avg = 0; + + self.num_samples_cpu_mem = 0; + } + + fn add_cpu_and_memory(&mut self, cpu: f32, memory: u64) -> Option<()> { + self.add_cpu_utilization(cpu)?; + self.add_memory(memory)?; + self.num_samples_cpu_mem += 1; + Some(()) + } + + fn add_memory(&mut self, value: u64) -> Option<()> { + // convert to MB + let mem_used_mb = value / (1024 * 1024); + let (min, max, avg) = if self.num_samples_cpu_mem == 0 { + (mem_used_mb, mem_used_mb, mem_used_mb) + } else { + stats( + self.memory_used_mb_min, + self.memory_used_mb_max, + self.memory_used_mb_avg, + self.num_samples_cpu_mem, + mem_used_mb, + )? + }; + self.memory_used_mb_min = min; + self.memory_used_mb_max = max; + self.memory_used_mb_avg = avg; + Some(()) + } + + fn add_cpu_utilization(&mut self, value: f32) -> Option<()> { + let cpu_used: f64 = value.into(); + let (min, max, avg) = if self.num_samples_cpu_mem == 0 { + (cpu_used, cpu_used, cpu_used) + } else { + stats( + self.cpu_utilization_percent_min, + self.cpu_utilization_percent_max, + self.cpu_utilization_percent_avg, + self.num_samples_cpu_mem, + cpu_used, + )? + }; + self.cpu_utilization_percent_min = min; + self.cpu_utilization_percent_max = max; + self.cpu_utilization_percent_avg = to_2_decimal_places(avg); + Some(()) + } +} + +fn to_2_decimal_places(avg: f64) -> f64 { + (avg * 100.0).round() / 100.0 +} + +#[derive(Serialize, Debug)] +pub(crate) struct TelemetryPayload { + pub os: Arc, + pub version: Arc, + pub storage_type: Arc, + pub instance_id: Arc, + pub cores: usize, + pub product_type: &'static str, + // cpu + pub cpu_utilization_percent_min: f64, + pub cpu_utilization_percent_max: f64, + pub cpu_utilization_percent_avg: f64, + // mem + pub memory_used_mb_min: u64, + pub memory_used_mb_max: u64, + pub memory_used_mb_avg: u64, +} + +#[cfg(test)] +mod tests { + use observability_deps::tracing::info; + + use crate::store::to_2_decimal_places; + + use super::*; + + #[test_log::test(tokio::test)] + async fn test_telemetry_handle_creation() { + // create store + let store: Arc = TelemetryStore::new( + Arc::from("some-instance-id"), + Arc::from("Linux"), + Arc::from("OSS-v3.0"), + Arc::from("Memory"), + 10, + ) + .await; + // check snapshot + let snapshot = store.snapshot(); + assert_eq!("some-instance-id", &*snapshot.instance_id); + + // add cpu/mem and snapshot 1 + let mem_used_bytes = 123456789; + let expected_mem_in_mb = 117; + store.add_cpu_and_memory(89.0, mem_used_bytes); + let snapshot = store.snapshot(); + info!(snapshot = ?snapshot, "dummy snapshot 1"); + assert_eq!(89.0, snapshot.cpu_utilization_percent_min); + assert_eq!(89.0, snapshot.cpu_utilization_percent_max); + assert_eq!(89.0, snapshot.cpu_utilization_percent_avg); + assert_eq!(expected_mem_in_mb, snapshot.memory_used_mb_min); + assert_eq!(expected_mem_in_mb, snapshot.memory_used_mb_max); + assert_eq!(expected_mem_in_mb, snapshot.memory_used_mb_avg); + + // add cpu/mem snapshot 2 + store.add_cpu_and_memory(100.0, 134567890); + let snapshot = store.snapshot(); + info!(snapshot = ?snapshot, "dummy snapshot 2"); + assert_eq!(89.0, snapshot.cpu_utilization_percent_min); + assert_eq!(100.0, snapshot.cpu_utilization_percent_max); + assert_eq!(94.5, snapshot.cpu_utilization_percent_avg); + assert_eq!(expected_mem_in_mb, snapshot.memory_used_mb_min); + assert_eq!(128, snapshot.memory_used_mb_max); + assert_eq!(122, snapshot.memory_used_mb_avg); + + // reset + store.reset_metrics(); + // check snapshot 3 + let snapshot = store.snapshot(); + info!(snapshot = ?snapshot, "dummy snapshot 3"); + assert_eq!(0.0, snapshot.cpu_utilization_percent_min); + assert_eq!(0.0, snapshot.cpu_utilization_percent_max); + assert_eq!(0.0, snapshot.cpu_utilization_percent_avg); + assert_eq!(0, snapshot.memory_used_mb_min); + assert_eq!(0, snapshot.memory_used_mb_max); + assert_eq!(0, snapshot.memory_used_mb_avg); + } + + #[test] + fn test_to_2_decimal_places() { + let x = 25.486842105263158; + let rounded = to_2_decimal_places(x); + assert_eq!(25.49, rounded); + } +}