From 60ac03790c7436eaf9dc5af48d2f7b6356a0496d Mon Sep 17 00:00:00 2001 From: TSnake41 Date: Mon, 31 Jul 2023 14:55:05 +0200 Subject: [PATCH] Redesign xenstored plugin to use watch to track changes. --- .../xcp-metrics-plugin-xenstored/Cargo.toml | 10 +- .../xcp-metrics-plugin-xenstored/src/main.rs | 164 +++++++++--------- .../src/plugin.rs | 133 ++++++++++++++ .../src/watch_cache.rs | 108 ++++++++++++ 4 files changed, 331 insertions(+), 84 deletions(-) create mode 100644 plugins/xcp-metrics-plugin-xenstored/src/plugin.rs create mode 100644 plugins/xcp-metrics-plugin-xenstored/src/watch_cache.rs diff --git a/plugins/xcp-metrics-plugin-xenstored/Cargo.toml b/plugins/xcp-metrics-plugin-xenstored/Cargo.toml index 7890260..d3d3a93 100644 --- a/plugins/xcp-metrics-plugin-xenstored/Cargo.toml +++ b/plugins/xcp-metrics-plugin-xenstored/Cargo.toml @@ -11,11 +11,13 @@ categories.workspace = true xcp-metrics-common = { path = "../../xcp-metrics-common" } xcp-metrics-plugin-common = { path = "../xcp-metrics-plugin-common" } -tokio = { version = "1", features = ["full"] } - uuid = "1.4" anyhow = "1.0" +dashmap = "5.5" + +tokio = { version = "1", features = ["full"] } +futures = "0.3" [dependencies.xenstore-rs] -git = "https://github.com/xcp-ng/xenstore-rs" -branch = "xs_write" \ No newline at end of file +git = "https://github.com/tsnake41/xenstore-rs" +branch = "xs_watch" diff --git a/plugins/xcp-metrics-plugin-xenstored/src/main.rs b/plugins/xcp-metrics-plugin-xenstored/src/main.rs index b2d755e..4a315c8 100644 --- a/plugins/xcp-metrics-plugin-xenstored/src/main.rs +++ b/plugins/xcp-metrics-plugin-xenstored/src/main.rs @@ -1,31 +1,36 @@ -use std::{collections::HashMap, rc::Rc, time::Duration}; +mod plugin; +mod watch_cache; +use std::{collections::HashMap, time::Duration}; + +use plugin::PluginState; use tokio::time; use xcp_metrics_common::metrics::{Label, MetricType, MetricValue, NumberValue}; use xcp_metrics_plugin_common::protocol_v3::{ utils::{SimpleMetric, SimpleMetricFamily, SimpleMetricSet}, MetricsPlugin, }; -use xenstore_rs::{XBTransaction, Xs, XsOpenFlags}; +use xenstore_rs::{Xs, XsOpenFlags}; -pub fn get_vm_infos(xs: &Xs, vm_uuid: &str, attributes: &[&str]) -> MetricValue { +pub fn get_vm_infos(plugin: &PluginState, vm_uuid: &str, attributes: &[&str]) -> MetricValue { MetricValue::Info( attributes .iter() .filter_map(|&attr| { - xs.read( - XBTransaction::Null, - format!("/vm/{vm_uuid}/{attr}").as_str(), - ) - .ok() - .map(|value| Label(attr.into(), value.into())) + plugin + .read(format!("/vm/{vm_uuid}/{attr}").as_str()) + .map(|value| Label(attr.into(), value.into())) }) .collect(), ) } -fn make_memory_target_metric(xs: &Xs, domid: &str, memory_target: i64) -> SimpleMetric { - let vm_uuid = get_domain_uuid(xs, domid); +fn make_memory_target_metric( + plugin: &PluginState, + domid: &str, + memory_target: i64, +) -> SimpleMetric { + let vm_uuid = get_domain_uuid(plugin, domid); let mut labels = vec![Label("domain".into(), domid.into())]; @@ -39,96 +44,95 @@ fn make_memory_target_metric(xs: &Xs, domid: &str, memory_target: i64) -> Simple } } -fn get_domain_uuid(xs: &Xs, domid: &str) -> Option { - xs.read( - XBTransaction::Null, - format!("/local/domain/{domid}/vm").as_str(), - ) - .and_then(|vm_path| xs.read(XBTransaction::Null, format!("{vm_path}/uuid").as_str())) - .ok() +fn get_domain_uuid(plugin: &PluginState, domid: &str) -> Option { + plugin + .read(format!("/local/domain/{domid}/vm").as_str()) + .and_then(|vm_path| plugin.read(format!("{vm_path}/uuid").as_str())) } -fn get_memory_target_value(xs: &Xs, domid: &str) -> Option { - xs.read( - XBTransaction::Null, - format!("/local/domain/{domid}/memory/target").as_str(), - ) - .ok() - .and_then(|value| { - value - .parse() - .map_err(|err| { - eprintln!("Memory target parse error {err:?}"); - err - }) - .ok() - }) +fn get_memory_target_value(plugin: &PluginState, domid: &str) -> Option { + plugin + .read(format!("/local/domain/{domid}/memory/target").as_str()) + .and_then(|value| { + value + .parse() + .map_err(|err| { + eprintln!("Memory target parse error {err:?}"); + err + }) + .ok() + }) } -fn generate_metrics(xs: &Xs) -> anyhow::Result { - let mut families: HashMap = HashMap::new(); - - match xs.directory(XBTransaction::Null, "/vm") { - Ok(vms) => { - families.insert( - "vm_info".into(), - SimpleMetricFamily { - metric_type: MetricType::Info, - unit: "".into(), - help: "Virtual machine informations".into(), - metrics: vms - .iter() - // Get vm metrics. - .map(|uuid| SimpleMetric { - labels: vec![Label("owner".into(), format!("vm {uuid}").into())].into(), - value: get_vm_infos(&xs, &uuid, &["name"]), - }) - .collect(), - }, - ); - } - Err(err) => println!("Unable to get vm list {err}"), +fn generate_metrics(plugin: &mut PluginState, xs: &Xs) -> anyhow::Result { + if let Err(e) = plugin.update_domains(xs) { + eprintln!("Unable to get domains: {e}"); } - match xs.directory(XBTransaction::Null, "/local/domain") { - Ok(domains) => { - families.insert( - "memory_target".into(), - SimpleMetricFamily { - metric_type: MetricType::Gauge, - unit: "bytes".into(), - help: "Target of VM balloon driver".into(), - metrics: domains - .iter() - // Get target memory metric (if exists). - .filter_map(|domid| get_memory_target_value(xs, &domid).map(|m| (domid, m))) - // Make it a metric. - .map(|(domid, memory_target)| { - make_memory_target_metric(xs, &domid, memory_target) - }) - .collect(), - }, - ); - } - Err(err) => println!("Unable to get domains list {err}"), + if let Err(e) = plugin.update_vms(xs) { + eprintln!("Unable to get vms: {e}"); } + let mut families: HashMap = HashMap::new(); + + families.insert( + "vm_info".into(), + SimpleMetricFamily { + metric_type: MetricType::Info, + unit: "".into(), + help: "Virtual machine informations".into(), + metrics: plugin + .vms + .iter() + // Get vm metrics. + .map(|uuid| SimpleMetric { + labels: vec![Label("owner".into(), format!("vm {uuid}").into())].into(), + value: get_vm_infos(plugin, &uuid, &["name"]), + }) + .collect(), + }, + ); + + families.insert( + "memory_target".into(), + SimpleMetricFamily { + metric_type: MetricType::Gauge, + unit: "bytes".into(), + help: "Target of VM balloon driver".into(), + metrics: plugin + .domains + .iter() + // Get target memory metric (if exists). + .filter_map(|domid| get_memory_target_value(plugin, &domid).map(|m| (domid, m))) + // Make it a metric. + .map(|(domid, memory_target)| { + make_memory_target_metric(plugin, &domid, memory_target) + }) + .collect(), + }, + ); + Ok(SimpleMetricSet { families }) } #[tokio::main] async fn main() -> anyhow::Result<()> { - let xs = Rc::new(Xs::new(XsOpenFlags::ReadOnly).map_err(|e| anyhow::anyhow!("{e}"))?); + let xs = Xs::new(XsOpenFlags::ReadOnly).map_err(|e| anyhow::anyhow!("{e}"))?; + + let mut plugin_state = PluginState::default(); let plugin = MetricsPlugin::new( "xcp-metrics-plugin-xenstored", - generate_metrics(&xs)?.into(), + generate_metrics(&mut plugin_state, &xs)?.into(), ) .await?; loop { // Fetch and push new metrics. - plugin.update(generate_metrics(&xs)?.into()).await.unwrap(); + plugin + .update(generate_metrics(&mut plugin_state, &xs)?.into()) + .await + .unwrap(); time::sleep(Duration::from_secs(1)).await; } diff --git a/plugins/xcp-metrics-plugin-xenstored/src/plugin.rs b/plugins/xcp-metrics-plugin-xenstored/src/plugin.rs new file mode 100644 index 0000000..bab46db --- /dev/null +++ b/plugins/xcp-metrics-plugin-xenstored/src/plugin.rs @@ -0,0 +1,133 @@ +use std::collections::HashSet; + +use xenstore_rs::{XBTransaction, Xs}; + +use crate::watch_cache::WatchCache; + +#[derive(Default)] +pub struct PluginState { + watch_cache: WatchCache, + + /// Domain ID -> Paths + pub domains: HashSet, + + /// VM ID -> Paths + pub vms: HashSet, +} + +static TRACKED_DOMAIN_ATTRIBUTES: &[&str] = &["memory/target", "vm"]; +static TRACKED_VM_ATTRIBUTES: &[&str] = &["name", "uuid"]; + +impl PluginState { + fn track_domain(&mut self, domain: &str) { + TRACKED_DOMAIN_ATTRIBUTES.into_iter().for_each(|attribute| { + if let Err(e) = self + .watch_cache + .watch(format!("/local/domain/{domain}/{attribute}").as_str()) + { + println!("{e}"); + } + }); + + self.domains.insert(domain.to_string()); + } + + fn untrack_domain(&mut self, domain: &str) { + TRACKED_DOMAIN_ATTRIBUTES.into_iter().for_each(|attribute| { + if let Err(e) = self + .watch_cache + .unwatch(format!("/local/domain/{domain}/{attribute}").as_str()) + { + println!("{e}"); + } + }); + + self.domains.remove(domain); + } + + fn track_vm(&mut self, vm: &str) { + TRACKED_VM_ATTRIBUTES.into_iter().for_each(|attribute| { + if let Err(e) = self + .watch_cache + .watch(format!("/vm/{vm}/{attribute}").as_str()) + { + println!("{e}"); + } + }); + + self.vms.insert(vm.to_string()); + } + + fn untrack_vm(&mut self, vm: &str) { + TRACKED_VM_ATTRIBUTES.into_iter().for_each(|attribute| { + if let Err(e) = self + .watch_cache + .unwatch(format!("/vm/{vm}/{attribute}").as_str()) + { + println!("{e}"); + } + }); + + self.vms.remove(vm); + } + + /// Check for removed and new domains, and update watcher. + pub fn update_domains(&mut self, xs: &Xs) -> anyhow::Result<()> { + let real_domains: HashSet = xs + .directory(XBTransaction::Null, "/local/domain")? + .into_iter() + .collect(); + + real_domains.iter().for_each(|domain| { + if !self.domains.contains(domain) { + println!("Now tracking domain {domain}"); + self.track_domain(domain); + } + }); + + // Check for removed domains. + self.domains + .difference(&real_domains) + .cloned() + .collect::>() + .into_iter() + .for_each(|domain| { + println!("Untracking domain {domain}"); + self.untrack_domain(&domain); + }); + + Ok(()) + } + + /// Check for removed and new vms, and update watcher. + pub fn update_vms(&mut self, xs: &Xs) -> anyhow::Result<()> { + let real_vms: HashSet = xs + .directory(XBTransaction::Null, "/vm")? + .into_iter() + .collect(); + + real_vms.iter().for_each(|vm| { + if !self.vms.contains(vm) { + println!("Now tracking vm {vm}"); + self.track_vm(vm); + } + }); + + // Check removed domains. + self.vms + .difference(&real_vms) + .cloned() + .collect::>() + .into_iter() + .for_each(|vm| { + println!("Untracking vm {vm}"); + self.untrack_vm(&vm); + }); + + Ok(()) + } + + pub fn read(&self, path: &str) -> Option { + self.watch_cache.read(path) + } +} diff --git a/plugins/xcp-metrics-plugin-xenstored/src/watch_cache.rs b/plugins/xcp-metrics-plugin-xenstored/src/watch_cache.rs new file mode 100644 index 0000000..3c9ec8c --- /dev/null +++ b/plugins/xcp-metrics-plugin-xenstored/src/watch_cache.rs @@ -0,0 +1,108 @@ +use dashmap::DashMap; +use futures::StreamExt; +use std::sync::Arc; +use tokio::{ + select, + sync::mpsc::{self, error::SendError}, + task::{self, JoinHandle}, +}; +use xenstore_rs::{XBTransaction, Xs, XsOpenFlags}; + +/// A Xs watch cache that passively update values. +pub struct WatchCache { + pub cache: Arc>, + pub watch_task: JoinHandle<()>, + watch_channel: mpsc::UnboundedSender, + unwatch_channel: mpsc::UnboundedSender, +} + +async fn watch_task( + cache: Arc>, + mut watch_channel: mpsc::UnboundedReceiver, + mut unwatch_channel: mpsc::UnboundedReceiver, +) { + let watch_task = task::spawn((|| { + let cache = cache.clone(); + + async move { + let xs = Xs::new(XsOpenFlags::ReadOnly).unwrap(); + + let mut stream = xs.get_stream().unwrap(); + + while let Some(entry) = stream.next().await { + match xs.read(XBTransaction::Null, &entry.path) { + Ok(value) => { + println!("Readed {} = {value}", entry.path); + cache.insert(entry.path, value); + } + Err(e) => { + println!("Removed {} ({e})", entry.path); + cache.remove(&entry.path); + } + } + } + } + })()); + + let watch_channel_task = task::spawn(async move { + let xs = Xs::new(XsOpenFlags::ReadOnly).unwrap(); + + while let Some(to_watch) = watch_channel.recv().await { + xs.watch(&to_watch, "xcp-metrics-xenstored").ok(); + } + }); + + let unwatch_channel_task = task::spawn(async move { + let xs = Xs::new(XsOpenFlags::ReadOnly).unwrap(); + + while let Some(to_unwatch) = unwatch_channel.recv().await { + xs.unwatch(&to_unwatch, "xcp-metrics-xenstored").ok(); + cache.remove(&to_unwatch); + } + }); + + select! { + _ = watch_task => {}, + _ = watch_channel_task => {}, + _ = unwatch_channel_task => {}, + }; +} + +impl WatchCache { + pub fn new() -> Self { + let cache = Arc::new(DashMap::new()); + let (watch_sender, watch_receiver) = mpsc::unbounded_channel(); + let (unwatch_sender, unwatch_receiver) = mpsc::unbounded_channel(); + + let watch_cache = cache.clone(); + let watch_task = + task::spawn( + async move { watch_task(watch_cache, watch_receiver, unwatch_receiver).await }, + ); + + Self { + cache, + watch_task, + watch_channel: watch_sender, + unwatch_channel: unwatch_sender, + } + } + + pub fn watch(&self, path: &str) -> Result<(), SendError> { + self.watch_channel.send(path.to_string()) + } + + pub fn unwatch(&self, path: &str) -> Result<(), SendError> { + self.unwatch_channel.send(path.to_string()) + } + + pub fn read(&self, path: &str) -> Option { + self.cache.get(path).map(|value| value.clone()) + } +} + +impl Default for WatchCache { + fn default() -> Self { + Self::new() + } +}