-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(telemetry): static values, cpu and mem metrics gathering
- basic setup to initialise the static values for telemetry store added. - cpu and memory used by influxdb3 is sampled at 1min interval - some minor tidyups Closes: #25370, #25371
- Loading branch information
1 parent
9c71b3c
commit d99c15d
Showing
9 changed files
with
600 additions
and
138 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
use std::{sync::Arc, time::Duration}; | ||
|
||
use observability_deps::tracing::debug; | ||
use sysinfo::{ProcessRefreshKind, System}; | ||
|
||
use crate::Result; | ||
use crate::{store::TelemetryStore, TelemetryError}; | ||
|
||
struct CpuAndMemorySampler { | ||
system: System, | ||
} | ||
|
||
impl CpuAndMemorySampler { | ||
pub fn new(system: System) -> Self { | ||
Self { system } | ||
} | ||
|
||
pub fn get_cpu_and_mem_used(&mut self) -> Result<(f32, u64)> { | ||
let pid = sysinfo::get_current_pid().map_err(TelemetryError::CannotGetPid)?; | ||
self.system.refresh_pids_specifics( | ||
&[pid], | ||
ProcessRefreshKind::new() | ||
.with_cpu() | ||
.with_memory() | ||
.with_disk_usage(), | ||
); | ||
|
||
let process = self | ||
.system | ||
.process(pid) | ||
.unwrap_or_else(|| panic!("cannot get process with pid: {}", pid)); | ||
|
||
let memory_used = process.memory(); | ||
let cpu_used = process.cpu_usage(); | ||
|
||
debug!( | ||
mem_used = ?memory_used, | ||
cpu_used = ?cpu_used, | ||
"trying to sample data for cpu/memory"); | ||
|
||
Ok((cpu_used, memory_used)) | ||
} | ||
} | ||
|
||
pub(crate) async fn sample_cpu_and_memory( | ||
store: Arc<TelemetryStore>, | ||
duration_secs: Duration, | ||
) -> tokio::task::JoinHandle<()> { | ||
tokio::spawn(async move { | ||
let mut sampler = CpuAndMemorySampler::new(System::new()); | ||
|
||
// sample every minute | ||
let mut interval = tokio::time::interval(duration_secs); | ||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
|
||
loop { | ||
interval.tick().await; | ||
if let Ok((cpu_used, memory_used)) = sampler.get_cpu_and_mem_used() { | ||
store.add_cpu_and_memory(cpu_used, memory_used); | ||
} | ||
} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,142 +1,20 @@ | ||
use observability_deps::tracing::error; | ||
use serde::Serialize; | ||
use std::{sync::Arc, time::Duration}; | ||
mod cpu_mem_sampler; | ||
mod sender; | ||
mod stats; | ||
pub mod store; | ||
|
||
/// This store is responsible for holding all the stats which | ||
/// will be sent in the background to the server. | ||
pub struct TelemetryStore { | ||
inner: parking_lot::Mutex<TelemetryStoreInner>, | ||
} | ||
|
||
impl TelemetryStore { | ||
pub async fn new( | ||
instance_id: String, | ||
os: String, | ||
influx_version: String, | ||
storage_type: String, | ||
cores: u32, | ||
) -> Arc<Self> { | ||
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores); | ||
let store = Arc::new(TelemetryStore { | ||
inner: parking_lot::Mutex::new(inner), | ||
}); | ||
send_telemetry_in_background(store.clone()).await; | ||
store | ||
} | ||
|
||
pub fn add_cpu_utilization(&self, value: u32) { | ||
let mut inner_store = self.inner.lock(); | ||
inner_store.cpu_utilization_percent = Some(value); | ||
} | ||
|
||
pub fn snapshot(&self) -> ExternalTelemetry { | ||
let inner_store = self.inner.lock(); | ||
inner_store.snapshot() | ||
} | ||
} | ||
|
||
struct TelemetryStoreInner { | ||
instance_id: String, | ||
os: String, | ||
influx_version: String, | ||
storage_type: String, | ||
cores: u32, | ||
// just for explanation | ||
cpu_utilization_percent: Option<u32>, | ||
} | ||
use thiserror::Error; | ||
|
||
impl TelemetryStoreInner { | ||
pub fn new( | ||
instance_id: String, | ||
os: String, | ||
influx_version: String, | ||
storage_type: String, | ||
cores: u32, | ||
) -> Self { | ||
TelemetryStoreInner { | ||
os, | ||
instance_id, | ||
influx_version, | ||
storage_type, | ||
cores, | ||
cpu_utilization_percent: None, | ||
} | ||
} | ||
#[derive(Debug, Error)] | ||
pub enum TelemetryError { | ||
#[error("cannot serialize to JSON: {0}")] | ||
CannotSerializeJson(#[from] serde_json::Error), | ||
|
||
pub fn snapshot(&self) -> ExternalTelemetry { | ||
ExternalTelemetry { | ||
os: self.os.clone(), | ||
version: self.influx_version.clone(), | ||
instance_id: self.instance_id.clone(), | ||
storage_type: self.storage_type.clone(), | ||
cores: self.cores, | ||
cpu_utilization_percent: self.cpu_utilization_percent, | ||
} | ||
} | ||
} | ||
#[error("failed to get pid: {0}")] | ||
CannotGetPid(&'static str), | ||
|
||
#[derive(Serialize)] | ||
pub struct ExternalTelemetry { | ||
pub os: String, | ||
pub version: String, | ||
pub storage_type: String, | ||
pub instance_id: String, | ||
pub cores: u32, | ||
pub cpu_utilization_percent: Option<u32>, | ||
#[error("cannot send telemetry: {0}")] | ||
CannotSendToTelemetryServer(#[from] reqwest::Error), | ||
} | ||
|
||
async fn send_telemetry_in_background(store: Arc<TelemetryStore>) -> tokio::task::JoinHandle<()> { | ||
tokio::spawn(async move { | ||
let client = reqwest::Client::new(); | ||
// TODO: Pass in the duration rather than hardcode it to 1hr | ||
let mut interval = tokio::time::interval(Duration::from_secs(60 * 60)); | ||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
|
||
loop { | ||
interval.tick().await; | ||
let telemetry = store.snapshot(); | ||
let maybe_json = serde_json::to_vec(&telemetry); | ||
match maybe_json { | ||
Ok(json) => { | ||
// TODO: wire it up to actual telemetry sender | ||
let _res = client | ||
.post("https://telemetry.influxdata.endpoint.com") | ||
.body(json) | ||
.send() | ||
.await; | ||
} | ||
Err(e) => { | ||
error!(error = ?e, "Cannot send telemetry"); | ||
} | ||
} | ||
} | ||
}) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
||
#[test_log::test(tokio::test)] | ||
async fn test_telemetry_handle_creation() { | ||
// create store | ||
let store: Arc<TelemetryStore> = TelemetryStore::new( | ||
"some-instance-id".to_owned(), | ||
"Linux".to_owned(), | ||
"OSS-v3.0".to_owned(), | ||
"Memory".to_owned(), | ||
10, | ||
) | ||
.await; | ||
// check snapshot | ||
let snapshot = store.snapshot(); | ||
assert_eq!("some-instance-id", snapshot.instance_id); | ||
|
||
// add cpu utilization | ||
store.add_cpu_utilization(89); | ||
|
||
// check snapshot again | ||
let snapshot = store.snapshot(); | ||
assert_eq!(Some(89), snapshot.cpu_utilization_percent); | ||
} | ||
} | ||
pub type Result<T, E = TelemetryError> = std::result::Result<T, E>; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
use std::{sync::Arc, time::Duration}; | ||
|
||
use observability_deps::tracing::{debug, error}; | ||
|
||
use crate::store::{TelemetryPayload, TelemetryStore}; | ||
use crate::{Result, TelemetryError}; | ||
|
||
pub(crate) struct TelemetrySender { | ||
client: reqwest::Client, | ||
req_path: String, | ||
} | ||
|
||
impl TelemetrySender { | ||
pub fn new(client: reqwest::Client, req_path: String) -> Self { | ||
Self { client, req_path } | ||
} | ||
|
||
pub async fn try_sending(&self, telemetry: &TelemetryPayload) -> Result<()> { | ||
debug!(telemetry = ?telemetry, "trying to send data to telemetry server"); | ||
let json = serde_json::to_vec(&telemetry).map_err(TelemetryError::CannotSerializeJson)?; | ||
self.client | ||
.post(self.req_path.as_str()) | ||
.body(json) | ||
.send() | ||
.await | ||
.map_err(TelemetryError::CannotSendToTelemetryServer)?; | ||
debug!("Successfully sent telemetry data to server"); | ||
Ok(()) | ||
} | ||
} | ||
|
||
pub(crate) async fn send_telemetry_in_background( | ||
store: Arc<TelemetryStore>, | ||
duration_secs: Duration, | ||
) -> tokio::task::JoinHandle<()> { | ||
tokio::spawn(async move { | ||
let telem_sender = TelemetrySender::new( | ||
reqwest::Client::new(), | ||
"https://telemetry.influxdata.foo.com".to_owned(), | ||
); | ||
let mut interval = tokio::time::interval(duration_secs); | ||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
|
||
loop { | ||
interval.tick().await; | ||
let telemetry = store.snapshot(); | ||
if let Err(e) = telem_sender.try_sending(&telemetry).await { | ||
error!(error = ?e, "Cannot send telemetry"); | ||
} | ||
// if we tried sending and failed, we currently still reset the | ||
// metrics, it is ok to miss few samples | ||
store.reset_metrics(); | ||
} | ||
}) | ||
} |
Oops, something went wrong.