-
Notifications
You must be signed in to change notification settings - Fork 2
/
transform.rs
156 lines (137 loc) · 6.28 KB
/
transform.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::{SystemTime, UNIX_EPOCH},
};
use alumet::{
measurement::{MeasurementBuffer, MeasurementPoint, WrappedMeasurementValue},
pipeline::{
elements::{error::TransformError, transform::TransformContext},
Transform,
},
resources::Resource,
};
pub struct EnergyAttributionTransform {
pub metrics: Arc<Mutex<super::Metrics>>,
buffer_pod: HashMap<u64, Vec<MeasurementPoint>>,
buffer_rapl: HashMap<u64, MeasurementPoint>,
}
impl EnergyAttributionTransform {
/// Instantiates a new EnergyAttributionTransform with its private fields initialized.
pub fn new(metrics: Arc<Mutex<super::Metrics>>) -> Self {
Self {
metrics,
buffer_pod: HashMap::<u64, Vec<MeasurementPoint>>::new(),
buffer_rapl: HashMap::<u64, MeasurementPoint>::new(),
}
}
/// Empties the buffers and send the energy attribution points to the MeasurementBuffer.
fn buffer_bouncer(&mut self, measurements: &mut alumet::measurement::MeasurementBuffer) {
// Retrieving the metric_id of the energy attribution.
// Using a nested scope to reduce the lock time.
let metric_id = {
let metrics = self.metrics.lock().unwrap();
metrics.pod_attributed_energy.unwrap()
};
// If the buffers do have enough (every) MeasurementPoints,
// then we compute the energy attribution.
while self.buffer_rapl.len() >= 2 && self.buffer_pod.len() >= 2 {
// Get the smallest rapl id i.e. the oldest timestamp (key) present in the buffer.
let rapl_mini_id = self
.buffer_rapl
.keys()
.reduce(|x, y| if x < y { x } else { y })
.unwrap()
.clone();
// Check if the buffer_pod contains the key to prevent any panic/error bellow.
if !self.buffer_pod.contains_key(&rapl_mini_id) {
todo!("decide what to do in this case");
}
let rapl_point = self.buffer_rapl.remove(&rapl_mini_id).unwrap();
// Compute the sum of every `total_usage_usec` for the given timestamp: `rapl_mini_id`.
let tot_time_sum = self
.buffer_pod
.get(&rapl_mini_id)
.unwrap()
.iter()
.map(|x| match x.value {
WrappedMeasurementValue::F64(fx) => fx,
WrappedMeasurementValue::U64(ux) => ux as f64,
})
.sum::<f64>();
// Then for every points in the buffer_pod at `rapl_mini_id`.
for point in self.buffer_pod.remove(&rapl_mini_id).unwrap().iter() {
// We extract the current tot_time as f64.
let cur_tot_time_f64 = match point.value {
WrappedMeasurementValue::F64(fx) => fx,
WrappedMeasurementValue::U64(ux) => ux as f64,
};
// Extract the attributes of the current point to add them
// to the new measurement point.
let point_attributes = point
.attributes()
.map(|(key, value)| (key.to_owned(), value.clone()))
.collect();
// We create the new MeasurementPoint for the energy attribution.
let new_m = MeasurementPoint::new(
rapl_point.timestamp,
metric_id,
point.resource.clone(),
point.consumer.clone(),
match rapl_point.value {
WrappedMeasurementValue::F64(fx) => cur_tot_time_f64 / tot_time_sum * fx,
WrappedMeasurementValue::U64(ux) => cur_tot_time_f64 / tot_time_sum * (ux as f64),
},
)
.with_attr_vec(point_attributes);
// And finally, the MeasurementPoint is pushed to the MeasurementBuffer.
measurements.push(new_m.clone());
}
}
}
}
impl Transform for EnergyAttributionTransform {
/// Applies the transform on the measurements.
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Retrieve the pod_id and the rapl_id.
// Using a nested scope to reduce the lock time.
let (pod_id, rapl_id) = {
let metrics = self.metrics.lock().unwrap();
let pod_id = metrics.cpu_usage_per_pod.unwrap().as_u64();
let rapl_id = metrics.rapl_consumed_energy.unwrap().as_u64();
(pod_id, rapl_id)
};
// Filling the buffers.
for m in measurements.clone().iter() {
if m.metric.as_u64() == rapl_id {
match m.resource {
// If the metric is rapl then we insert only the cpu package one in the buffer.
Resource::CpuPackage { id: _ } => {
let id = SystemTime::from(m.timestamp).duration_since(UNIX_EPOCH)?.as_secs();
self.buffer_rapl.insert(id, m.clone());
}
_ => continue,
}
} else if m.metric.as_u64() == pod_id {
// Else, if the metric is pod, then we keep only the ones that are prefixed with "pod"
// before inserting them in the buffer.
if m.attributes().any(|(_, value)| value.to_string().starts_with("pod")) {
let id = SystemTime::from(m.timestamp).duration_since(UNIX_EPOCH)?.as_secs();
match self.buffer_pod.get_mut(&id) {
Some(vec_points) => {
vec_points.push(m.clone());
}
None => {
// If the buffer does not have any value for the current id (timestamp)
// then we create the vec with its first value.
self.buffer_pod.insert(id, vec![m.clone()]);
}
}
}
}
}
// Emptying the buffers and pushing the energy attribution to the MeasurementBuffer
self.buffer_bouncer(measurements);
Ok(())
}
}