Skip to content

Commit

Permalink
Redesign xenstored plugin to use watch to track changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
TSnake41 committed Jul 31, 2023
1 parent fac65c0 commit 60ac037
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 84 deletions.
10 changes: 6 additions & 4 deletions plugins/xcp-metrics-plugin-xenstored/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ categories.workspace = true
xcp-metrics-common = { path = "../../xcp-metrics-common" }
xcp-metrics-plugin-common = { path = "../xcp-metrics-plugin-common" }

tokio = { version = "1", features = ["full"] }

uuid = "1.4"
anyhow = "1.0"
dashmap = "5.5"

tokio = { version = "1", features = ["full"] }
futures = "0.3"

[dependencies.xenstore-rs]
git = "https://github.com/xcp-ng/xenstore-rs"
branch = "xs_write"
git = "https://github.com/tsnake41/xenstore-rs"
branch = "xs_watch"
164 changes: 84 additions & 80 deletions plugins/xcp-metrics-plugin-xenstored/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
use std::{collections::HashMap, rc::Rc, time::Duration};
mod plugin;
mod watch_cache;

use std::{collections::HashMap, time::Duration};

use plugin::PluginState;
use tokio::time;
use xcp_metrics_common::metrics::{Label, MetricType, MetricValue, NumberValue};
use xcp_metrics_plugin_common::protocol_v3::{
utils::{SimpleMetric, SimpleMetricFamily, SimpleMetricSet},
MetricsPlugin,
};
use xenstore_rs::{XBTransaction, Xs, XsOpenFlags};
use xenstore_rs::{Xs, XsOpenFlags};

pub fn get_vm_infos(xs: &Xs, vm_uuid: &str, attributes: &[&str]) -> MetricValue {
pub fn get_vm_infos(plugin: &PluginState, vm_uuid: &str, attributes: &[&str]) -> MetricValue {
MetricValue::Info(
attributes
.iter()
.filter_map(|&attr| {
xs.read(
XBTransaction::Null,
format!("/vm/{vm_uuid}/{attr}").as_str(),
)
.ok()
.map(|value| Label(attr.into(), value.into()))
plugin
.read(format!("/vm/{vm_uuid}/{attr}").as_str())
.map(|value| Label(attr.into(), value.into()))
})
.collect(),
)
}

fn make_memory_target_metric(xs: &Xs, domid: &str, memory_target: i64) -> SimpleMetric {
let vm_uuid = get_domain_uuid(xs, domid);
fn make_memory_target_metric(
plugin: &PluginState,
domid: &str,
memory_target: i64,
) -> SimpleMetric {
let vm_uuid = get_domain_uuid(plugin, domid);

let mut labels = vec![Label("domain".into(), domid.into())];

Expand All @@ -39,96 +44,95 @@ fn make_memory_target_metric(xs: &Xs, domid: &str, memory_target: i64) -> Simple
}
}

fn get_domain_uuid(xs: &Xs, domid: &str) -> Option<String> {
xs.read(
XBTransaction::Null,
format!("/local/domain/{domid}/vm").as_str(),
)
.and_then(|vm_path| xs.read(XBTransaction::Null, format!("{vm_path}/uuid").as_str()))
.ok()
fn get_domain_uuid(plugin: &PluginState, domid: &str) -> Option<String> {
plugin
.read(format!("/local/domain/{domid}/vm").as_str())
.and_then(|vm_path| plugin.read(format!("{vm_path}/uuid").as_str()))
}

fn get_memory_target_value(xs: &Xs, domid: &str) -> Option<i64> {
xs.read(
XBTransaction::Null,
format!("/local/domain/{domid}/memory/target").as_str(),
)
.ok()
.and_then(|value| {
value
.parse()
.map_err(|err| {
eprintln!("Memory target parse error {err:?}");
err
})
.ok()
})
fn get_memory_target_value(plugin: &PluginState, domid: &str) -> Option<i64> {
plugin
.read(format!("/local/domain/{domid}/memory/target").as_str())
.and_then(|value| {
value
.parse()
.map_err(|err| {
eprintln!("Memory target parse error {err:?}");
err
})
.ok()
})
}

fn generate_metrics(xs: &Xs) -> anyhow::Result<SimpleMetricSet> {
let mut families: HashMap<String, SimpleMetricFamily> = HashMap::new();

match xs.directory(XBTransaction::Null, "/vm") {
Ok(vms) => {
families.insert(
"vm_info".into(),
SimpleMetricFamily {
metric_type: MetricType::Info,
unit: "".into(),
help: "Virtual machine informations".into(),
metrics: vms
.iter()
// Get vm metrics.
.map(|uuid| SimpleMetric {
labels: vec![Label("owner".into(), format!("vm {uuid}").into())].into(),
value: get_vm_infos(&xs, &uuid, &["name"]),
})
.collect(),
},
);
}
Err(err) => println!("Unable to get vm list {err}"),
fn generate_metrics(plugin: &mut PluginState, xs: &Xs) -> anyhow::Result<SimpleMetricSet> {
if let Err(e) = plugin.update_domains(xs) {
eprintln!("Unable to get domains: {e}");
}

match xs.directory(XBTransaction::Null, "/local/domain") {
Ok(domains) => {
families.insert(
"memory_target".into(),
SimpleMetricFamily {
metric_type: MetricType::Gauge,
unit: "bytes".into(),
help: "Target of VM balloon driver".into(),
metrics: domains
.iter()
// Get target memory metric (if exists).
.filter_map(|domid| get_memory_target_value(xs, &domid).map(|m| (domid, m)))
// Make it a metric.
.map(|(domid, memory_target)| {
make_memory_target_metric(xs, &domid, memory_target)
})
.collect(),
},
);
}
Err(err) => println!("Unable to get domains list {err}"),
if let Err(e) = plugin.update_vms(xs) {
eprintln!("Unable to get vms: {e}");
}

let mut families: HashMap<String, SimpleMetricFamily> = HashMap::new();

families.insert(
"vm_info".into(),
SimpleMetricFamily {
metric_type: MetricType::Info,
unit: "".into(),
help: "Virtual machine informations".into(),
metrics: plugin
.vms
.iter()
// Get vm metrics.
.map(|uuid| SimpleMetric {
labels: vec![Label("owner".into(), format!("vm {uuid}").into())].into(),
value: get_vm_infos(plugin, &uuid, &["name"]),
})
.collect(),
},
);

families.insert(
"memory_target".into(),
SimpleMetricFamily {
metric_type: MetricType::Gauge,
unit: "bytes".into(),
help: "Target of VM balloon driver".into(),
metrics: plugin
.domains
.iter()
// Get target memory metric (if exists).
.filter_map(|domid| get_memory_target_value(plugin, &domid).map(|m| (domid, m)))
// Make it a metric.
.map(|(domid, memory_target)| {
make_memory_target_metric(plugin, &domid, memory_target)
})
.collect(),
},
);

Ok(SimpleMetricSet { families })
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let xs = Rc::new(Xs::new(XsOpenFlags::ReadOnly).map_err(|e| anyhow::anyhow!("{e}"))?);
let xs = Xs::new(XsOpenFlags::ReadOnly).map_err(|e| anyhow::anyhow!("{e}"))?;

let mut plugin_state = PluginState::default();

let plugin = MetricsPlugin::new(
"xcp-metrics-plugin-xenstored",
generate_metrics(&xs)?.into(),
generate_metrics(&mut plugin_state, &xs)?.into(),
)
.await?;

loop {
// Fetch and push new metrics.
plugin.update(generate_metrics(&xs)?.into()).await.unwrap();
plugin
.update(generate_metrics(&mut plugin_state, &xs)?.into())
.await
.unwrap();

time::sleep(Duration::from_secs(1)).await;
}
Expand Down
133 changes: 133 additions & 0 deletions plugins/xcp-metrics-plugin-xenstored/src/plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use std::collections::HashSet;

use xenstore_rs::{XBTransaction, Xs};

use crate::watch_cache::WatchCache;

#[derive(Default)]
pub struct PluginState {
watch_cache: WatchCache,

/// Domain ID -> Paths
pub domains: HashSet<String>,

/// VM ID -> Paths
pub vms: HashSet<String>,
}

static TRACKED_DOMAIN_ATTRIBUTES: &[&str] = &["memory/target", "vm"];
static TRACKED_VM_ATTRIBUTES: &[&str] = &["name", "uuid"];

impl PluginState {
fn track_domain(&mut self, domain: &str) {
TRACKED_DOMAIN_ATTRIBUTES.into_iter().for_each(|attribute| {
if let Err(e) = self
.watch_cache
.watch(format!("/local/domain/{domain}/{attribute}").as_str())
{
println!("{e}");
}
});

self.domains.insert(domain.to_string());
}

fn untrack_domain(&mut self, domain: &str) {
TRACKED_DOMAIN_ATTRIBUTES.into_iter().for_each(|attribute| {
if let Err(e) = self
.watch_cache
.unwatch(format!("/local/domain/{domain}/{attribute}").as_str())
{
println!("{e}");
}
});

self.domains.remove(domain);
}

fn track_vm(&mut self, vm: &str) {
TRACKED_VM_ATTRIBUTES.into_iter().for_each(|attribute| {
if let Err(e) = self
.watch_cache
.watch(format!("/vm/{vm}/{attribute}").as_str())
{
println!("{e}");
}
});

self.vms.insert(vm.to_string());
}

fn untrack_vm(&mut self, vm: &str) {
TRACKED_VM_ATTRIBUTES.into_iter().for_each(|attribute| {
if let Err(e) = self
.watch_cache
.unwatch(format!("/vm/{vm}/{attribute}").as_str())
{
println!("{e}");
}
});

self.vms.remove(vm);
}

/// Check for removed and new domains, and update watcher.
pub fn update_domains(&mut self, xs: &Xs) -> anyhow::Result<()> {
let real_domains: HashSet<String> = xs
.directory(XBTransaction::Null, "/local/domain")?
.into_iter()
.collect();

real_domains.iter().for_each(|domain| {
if !self.domains.contains(domain) {
println!("Now tracking domain {domain}");
self.track_domain(domain);
}
});

// Check for removed domains.
self.domains
.difference(&real_domains)
.cloned()
.collect::<Vec<String>>()
.into_iter()
.for_each(|domain| {
println!("Untracking domain {domain}");
self.untrack_domain(&domain);
});

Ok(())
}

/// Check for removed and new vms, and update watcher.
pub fn update_vms(&mut self, xs: &Xs) -> anyhow::Result<()> {
let real_vms: HashSet<String> = xs
.directory(XBTransaction::Null, "/vm")?
.into_iter()
.collect();

real_vms.iter().for_each(|vm| {
if !self.vms.contains(vm) {
println!("Now tracking vm {vm}");
self.track_vm(vm);
}
});

// Check removed domains.
self.vms
.difference(&real_vms)
.cloned()
.collect::<Vec<String>>()
.into_iter()
.for_each(|vm| {
println!("Untracking vm {vm}");
self.untrack_vm(&vm);
});

Ok(())
}

pub fn read(&self, path: &str) -> Option<String> {
self.watch_cache.read(path)
}
}
Loading

0 comments on commit 60ac037

Please sign in to comment.