Skip to content

Commit 95820f7

Browse files
committed
Add mostrecent aggregation to Gauge
In the multiprocess mode, the process that expose the metrics need to aggregate the samples from other processes. Gauge metric allows users to choose the aggregation mode. This implements 'mostrecent' (and 'livemostrecent') mode where the last observed value is exposed. In order to support this, the file format is expanded to store the timestamps in addition to the values. The stored timestamps are read by the reader process and it's used to find the latest value. The timestamp itself is exposed as a part of Prometheus exposition (https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md). This allows further aggregation across exporters. Closes #847 Consideration on the atomicity: Previously, mmap_dict.py had a comment saying "We assume that reading from an 8 byte aligned value is atomic". With this change, the value write becomes a 16 bytes 8-byte aligned write. The code author tried to find a basis on the original assumption, but couldn't find any. According to write(2), **if a file descriptor is shared**, the write becomes atomic. However, we do not share the file descriptors in the current architecture. Considering that Ruby implementation also does the same and hadn't seen an issue with it, this write atomicity problem might be practically not an issue. See also: * prometheus/client_ruby#172 The approach and naming are taken from client_ruby. * https://github.com/prometheus/client_golang/blob/v1.17.0/prometheus/metric.go#L149-L161 client_golang has an API for setting timestamp already. It explains the use case for the timestamp beyond the client-local aggregation. In order to support the same use case in Python, further changes are needed. Signed-off-by: Masaya Suzuki <draftcode@gmail.com>
1 parent 249490e commit 95820f7

File tree

5 files changed

+93
-45
lines changed

5 files changed

+93
-45
lines changed

prometheus_client/metrics.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ def f():
346346
d.set_function(lambda: len(my_dict))
347347
"""
348348
_type = 'gauge'
349-
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'))
349+
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'))
350350

351351
def __init__(self,
352352
name: str,
@@ -357,7 +357,7 @@ def __init__(self,
357357
unit: str = '',
358358
registry: Optional[CollectorRegistry] = REGISTRY,
359359
_labelvalues: Optional[Sequence[str]] = None,
360-
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'] = 'all',
360+
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
361361
):
362362
self._multiprocess_mode = multiprocess_mode
363363
if multiprocess_mode not in self._MULTIPROC_MODES:
@@ -390,10 +390,15 @@ def dec(self, amount: float = 1) -> None:
390390
self._raise_if_not_observable()
391391
self._value.inc(-amount)
392392

393-
def set(self, value: float) -> None:
394-
"""Set gauge to the given value."""
393+
def set(self, value: float, timestamp_sec: Optional[float] = None) -> None:
394+
"""Set gauge to the given value.
395+
396+
This can take an optional timestamp to indicate when the sample was
397+
taken. This is used for the most_recent aggregation and Prometheus
398+
exposition.
399+
"""
395400
self._raise_if_not_observable()
396-
self._value.set(float(value))
401+
self._value.set(float(value), timestamp_sec=timestamp_sec)
397402

398403
def set_to_current_time(self) -> None:
399404
"""Set gauge to the current unixtime."""

prometheus_client/mmap_dict.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,26 @@
66

77
_INITIAL_MMAP_SIZE = 1 << 16
88
_pack_integer_func = struct.Struct(b'i').pack
9-
_pack_double_func = struct.Struct(b'd').pack
9+
_pack_two_doubles_func = struct.Struct(b'dd').pack
1010
_unpack_integer = struct.Struct(b'i').unpack_from
11-
_unpack_double = struct.Struct(b'd').unpack_from
11+
_unpack_two_doubles = struct.Struct(b'dd').unpack_from
1212

1313

1414
# struct.pack_into has atomicity issues because it will temporarily write 0 into
1515
# the mmap, resulting in false reads to 0 when experiencing a lot of writes.
1616
# Using direct assignment solves this issue.
1717

18-
def _pack_double(data, pos, value):
19-
data[pos:pos + 8] = _pack_double_func(value)
18+
19+
def _pack_two_doubles(data, pos, value, timestamp_sec):
20+
data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp_sec)
2021

2122

2223
def _pack_integer(data, pos, value):
2324
data[pos:pos + 4] = _pack_integer_func(value)
2425

2526

2627
def _read_all_values(data, used=0):
27-
"""Yield (key, value, pos). No locking is performed."""
28+
"""Yield (key, value, timestamp_sec, pos). No locking is performed."""
2829

2930
if used <= 0:
3031
# If not valid `used` value is passed in, read it from the file.
@@ -41,9 +42,9 @@ def _read_all_values(data, used=0):
4142
encoded_key = data[pos:pos + encoded_len]
4243
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
4344
pos += padded_len
44-
value = _unpack_double(data, pos)[0]
45-
yield encoded_key.decode('utf-8'), value, pos
46-
pos += 8
45+
value, timestamp_sec = _unpack_two_doubles(data, pos)
46+
yield encoded_key.decode('utf-8'), value, timestamp_sec, pos
47+
pos += 16
4748

4849

4950
class MmapedDict:
@@ -53,7 +54,8 @@ class MmapedDict:
5354
Then 4 bytes of padding.
5455
There's then a number of entries, consisting of a 4 byte int which is the
5556
size of the next field, a utf-8 encoded string key, padding to a 8 byte
56-
alignment, and then a 8 byte float which is the value.
57+
alignment, and then a 8 byte float which is the value and a 8 byte float
58+
which is a UNIX timestamp in seconds.
5759
5860
Not thread safe.
5961
"""
@@ -76,7 +78,7 @@ def __init__(self, filename, read_mode=False):
7678
_pack_integer(self._m, 0, self._used)
7779
else:
7880
if not read_mode:
79-
for key, _, pos in self._read_all_values():
81+
for key, _, _, pos in self._read_all_values():
8082
self._positions[key] = pos
8183

8284
@staticmethod
@@ -95,7 +97,7 @@ def _init_value(self, key):
9597
encoded = key.encode('utf-8')
9698
# Pad to be 8-byte aligned.
9799
padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
98-
value = struct.pack(f'i{len(padded)}sd'.encode(), len(encoded), padded, 0.0)
100+
value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0)
99101
while self._used + len(value) > self._capacity:
100102
self._capacity *= 2
101103
self._f.truncate(self._capacity)
@@ -105,30 +107,28 @@ def _init_value(self, key):
105107
# Update how much space we've used.
106108
self._used += len(value)
107109
_pack_integer(self._m, 0, self._used)
108-
self._positions[key] = self._used - 8
110+
self._positions[key] = self._used - 16
109111

110112
def _read_all_values(self):
111113
"""Yield (key, value, pos). No locking is performed."""
112114
return _read_all_values(data=self._m, used=self._used)
113115

114116
def read_all_values(self):
115-
"""Yield (key, value). No locking is performed."""
116-
for k, v, _ in self._read_all_values():
117-
yield k, v
117+
"""Yield (key, value, timestamp_sec). No locking is performed."""
118+
for k, v, ts, _ in self._read_all_values():
119+
yield k, v, ts
118120

119121
def read_value(self, key):
120122
if key not in self._positions:
121123
self._init_value(key)
122124
pos = self._positions[key]
123-
# We assume that reading from an 8 byte aligned value is atomic
124-
return _unpack_double(self._m, pos)[0]
125+
return _unpack_two_doubles(self._m, pos)
125126

126-
def write_value(self, key, value):
127+
def write_value(self, key, value, timestamp_sec):
127128
if key not in self._positions:
128129
self._init_value(key)
129130
pos = self._positions[key]
130-
# We assume that writing to an 8 byte aligned value is atomic
131-
_pack_double(self._m, pos, value)
131+
_pack_two_doubles(self._m, pos, value, timestamp_sec)
132132

133133
def close(self):
134134
if self._f:

prometheus_client/multiprocess.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def _parse_key(key):
6868
# the file is missing
6969
continue
7070
raise
71-
for key, value, _ in file_values:
71+
for key, value, timestamp_sec, _ in file_values:
7272
metric_name, name, labels, labels_key, help_text = _parse_key(key)
7373

7474
metric = metrics.get(metric_name)
@@ -79,7 +79,7 @@ def _parse_key(key):
7979
if typ == 'gauge':
8080
pid = parts[2][:-3]
8181
metric._multiprocess_mode = parts[1]
82-
metric.add_sample(name, labels_key + (('pid', pid),), value)
82+
metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp_sec)
8383
else:
8484
# The duplicates and labels are fixed in the next for.
8585
metric.add_sample(name, labels_key, value)
@@ -89,6 +89,7 @@ def _parse_key(key):
8989
def _accumulate_metrics(metrics, accumulate):
9090
for metric in metrics.values():
9191
samples = defaultdict(float)
92+
sample_timestamp_secs = defaultdict(float)
9293
buckets = defaultdict(lambda: defaultdict(float))
9394
samples_setdefault = samples.setdefault
9495
for s in metric.samples:
@@ -105,6 +106,12 @@ def _accumulate_metrics(metrics, accumulate):
105106
samples[without_pid_key] = value
106107
elif metric._multiprocess_mode in ('sum', 'livesum'):
107108
samples[without_pid_key] += value
109+
elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'):
110+
current_ts_sec = sample_timestamp_secs[without_pid_key]
111+
ts_sec = float(timestamp or 0)
112+
if current_ts_sec < ts_sec:
113+
samples[without_pid_key] = value
114+
sample_timestamp_secs[without_pid_key] = ts_sec
108115
else: # all/liveall
109116
samples[(name, labels)] = value
110117

@@ -143,7 +150,7 @@ def _accumulate_metrics(metrics, accumulate):
143150
samples[(metric.name + '_count', labels)] = acc
144151

145152
# Convert to correct sample format.
146-
metric.samples = [Sample(name_, dict(labels), value) for (name_, labels), value in samples.items()]
153+
metric.samples = [Sample(name_, dict(labels), value, sample_timestamp_secs.get((name_, labels), None)) for (name_, labels), value in samples.items()]
147154
return metrics.values()
148155

149156
def collect(self):

prometheus_client/values.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
from threading import Lock
3+
import time
34
import warnings
45

56
from .mmap_dict import mmap_key, MmapedDict
@@ -12,16 +13,23 @@ class MutexValue:
1213

1314
def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, **kwargs):
1415
self._value = 0.0
16+
self._timestamp_sec = 0.0
1517
self._exemplar = None
1618
self._lock = Lock()
1719

18-
def inc(self, amount):
20+
def inc(self, amount, timestamp_sec=None):
21+
if not timestamp_sec:
22+
timestamp_sec = time.time()
1923
with self._lock:
2024
self._value += amount
25+
self._timestamp_sec = timestamp_sec
2126

22-
def set(self, value):
27+
def set(self, value, timestamp_sec=None):
28+
if not timestamp_sec:
29+
timestamp_sec = time.time()
2330
with self._lock:
2431
self._value = value
32+
self._timestamp_sec = timestamp_sec
2533

2634
def set_exemplar(self, exemplar):
2735
with self._lock:
@@ -82,7 +90,7 @@ def __reset(self):
8290
files[file_prefix] = MmapedDict(filename)
8391
self._file = files[file_prefix]
8492
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
85-
self._value = self._file.read_value(self._key)
93+
self._value, self._timestamp_sec = self._file.read_value(self._key)
8694

8795
def __check_for_pid_change(self):
8896
actual_pid = process_identifier()
@@ -95,17 +103,25 @@ def __check_for_pid_change(self):
95103
for value in values:
96104
value.__reset()
97105

98-
def inc(self, amount):
106+
def inc(self, amount, timestamp_sec=None):
107+
if not timestamp_sec:
108+
timestamp_sec = time.time()
109+
99110
with lock:
100111
self.__check_for_pid_change()
101112
self._value += amount
102-
self._file.write_value(self._key, self._value)
113+
self._timestamp_sec = timestamp_sec
114+
self._file.write_value(self._key, self._value, self._timestamp_sec)
115+
116+
def set(self, value, timestamp_sec=None):
117+
if not timestamp_sec:
118+
timestamp_sec = time.time()
103119

104-
def set(self, value):
105120
with lock:
106121
self.__check_for_pid_change()
107122
self._value = value
108-
self._file.write_value(self._key, self._value)
123+
self._timestamp_sec = timestamp_sec
124+
self._file.write_value(self._key, self._value, self._timestamp_sec)
109125

110126
def set_exemplar(self, exemplar):
111127
# TODO: Implement exemplars for multiprocess mode.

tests/test_multiprocess.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,26 @@ def test_gauge_livesum(self):
185185
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
186186
self.assertEqual(2, self.registry.get_sample_value('g'))
187187

188+
def test_gauge_mostrecent(self):
189+
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
190+
values.ValueClass = MultiProcessValue(lambda: 456)
191+
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
192+
g1.set(1, 2000)
193+
g2.set(2, 1000)
194+
self.assertEqual(1, self.registry.get_sample_value('g'))
195+
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
196+
self.assertEqual(1, self.registry.get_sample_value('g'))
197+
198+
def test_gauge_livemostrecent(self):
199+
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
200+
values.ValueClass = MultiProcessValue(lambda: 456)
201+
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
202+
g1.set(1, 2000)
203+
g2.set(2, 1000)
204+
self.assertEqual(1, self.registry.get_sample_value('g'))
205+
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
206+
self.assertEqual(2, self.registry.get_sample_value('g'))
207+
188208
def test_namespace_subsystem(self):
189209
c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss')
190210
c1.inc(1)
@@ -369,28 +389,28 @@ def setUp(self):
369389
self.d = mmap_dict.MmapedDict(self.tempfile)
370390

371391
def test_process_restart(self):
372-
self.d.write_value('abc', 123.0)
392+
self.d.write_value('abc', 123.0, 987.0)
373393
self.d.close()
374394
self.d = mmap_dict.MmapedDict(self.tempfile)
375-
self.assertEqual(123, self.d.read_value('abc'))
376-
self.assertEqual([('abc', 123.0)], list(self.d.read_all_values()))
395+
self.assertEqual((123, 987.0), self.d.read_value('abc'))
396+
self.assertEqual([('abc', 123.0, 987.0)], list(self.d.read_all_values()))
377397

378398
def test_expansion(self):
379399
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE
380-
self.d.write_value(key, 123.0)
381-
self.assertEqual([(key, 123.0)], list(self.d.read_all_values()))
400+
self.d.write_value(key, 123.0, 987.0)
401+
self.assertEqual([(key, 123.0, 987.0)], list(self.d.read_all_values()))
382402

383403
def test_multi_expansion(self):
384404
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE * 4
385-
self.d.write_value('abc', 42.0)
386-
self.d.write_value(key, 123.0)
387-
self.d.write_value('def', 17.0)
405+
self.d.write_value('abc', 42.0, 987.0)
406+
self.d.write_value(key, 123.0, 876.0)
407+
self.d.write_value('def', 17.0, 765.0)
388408
self.assertEqual(
389-
[('abc', 42.0), (key, 123.0), ('def', 17.0)],
409+
[('abc', 42.0, 987.0), (key, 123.0, 876.0), ('def', 17.0, 765.0)],
390410
list(self.d.read_all_values()))
391411

392412
def test_corruption_detected(self):
393-
self.d.write_value('abc', 42.0)
413+
self.d.write_value('abc', 42.0, 987.0)
394414
# corrupt the written data
395415
self.d._m[8:16] = b'somejunk'
396416
with self.assertRaises(RuntimeError):

0 commit comments

Comments
 (0)