Skip to content

Commit 0f74274

Browse files
committed
WIP
1 parent f4328bb commit 0f74274

20 files changed

+1171
-844
lines changed

distributed/http/tests/test_core.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from distributed import Semaphore
99
from distributed.utils_test import fetch_metrics_sample_names, gen_cluster, inc
10-
10+
from distributed.metrics import context_meter
1111

1212
@gen_cluster(client=True)
1313
async def test_scheduler(c, s, a, b):
@@ -26,6 +26,10 @@ async def test_prometheus_api_doc(c, s, a):
2626
# Some metrics only appear if there are tasks on the cluster
2727
fut = c.submit(inc, 1)
2828
await fut
29+
30+
with context_meter.no_threshold():
31+
a.data.evict()
32+
2933
# Semaphore metrics only appear after semaphores are used
3034
sem = await Semaphore()
3135
await sem.acquire()

distributed/http/worker/prometheus/core.py

Lines changed: 25 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -152,44 +152,11 @@ def collect(self) -> Iterator[Metric]:
152152
unit="seconds",
153153
value=max_tick_duration,
154154
)
155-
156155
yield CounterMetricFamily(
157156
self.build_name("tick_count"),
158157
"Total number of ticks observed since the server started",
159158
value=self.server._tick_counter,
160159
)
161-
162-
# This duplicates spill_time_total; however the breakdown is different
163-
evloop_blocked_total = CounterMetricFamily(
164-
self.build_name("event_loop_blocked_time"),
165-
"Total time during which the worker's event loop was blocked "
166-
"by spill/unspill activity since the latest worker reset",
167-
unit="seconds",
168-
labels=["cause"],
169-
)
170-
# This is typically higher than spill_time_per_key_max, as multiple keys can be
171-
# spilled/unspilled without yielding the event loop
172-
evloop_blocked_max = GaugeMetricFamily(
173-
self.build_name("event_loop_blocked_time_max"),
174-
"Maximum contiguous time during which the worker's event loop was blocked "
175-
"by spill/unspill activity since the previous Prometheus poll",
176-
unit="seconds",
177-
labels=["cause"],
178-
)
179-
for family, digest in (
180-
(evloop_blocked_total, self.server.digests_total),
181-
(evloop_blocked_max, self.server.digests_max),
182-
):
183-
for family_label, digest_label in (
184-
("disk-write-target", "disk-write-target-duration"),
185-
("disk-write-spill", "disk-write-spill-duration"),
186-
("disk-read-execute", "disk-load-duration"),
187-
("disk-read-get-data", "get-data-load-duration"),
188-
):
189-
family.add_metric([family_label], digest[digest_label])
190-
191-
yield evloop_blocked_total
192-
yield evloop_blocked_max
193160
self.server.digests_max.clear()
194161

195162
def collect_crick(self) -> Iterator[Metric]:
@@ -243,42 +210,36 @@ def collect_spillbuffer(self) -> Iterator[Metric]:
243210
read = spill_bytes.disk_read / spill_time.disk_read
244211
"""
245212
try:
246-
get_metrics = self.server.data.get_metrics # type: ignore
213+
metrics = self.server.data.cumulative_metrics # type: ignore
247214
except AttributeError:
248215
return # spilling is disabled
249-
metrics = get_metrics()
250216

251-
total_bytes = CounterMetricFamily(
252-
self.build_name("spill_bytes"),
253-
"Total size of memory and disk accesses caused by managed data "
254-
"since the latest worker restart",
255-
labels=["activity"],
256-
)
257-
# Note: memory_read is used to calculate cache hit ratios (see docstring)
258-
for k in ("memory_read", "disk_read", "disk_write"):
259-
total_bytes.add_metric([k], metrics[f"{k}_bytes_total"])
260-
yield total_bytes
261-
262-
total_counts = CounterMetricFamily(
263-
self.build_name("spill_count"),
264-
"Total number of memory and disk accesses caused by managed data "
265-
"since the latest worker restart",
266-
labels=["activity"],
267-
)
217+
counters = {
218+
"bytes": CounterMetricFamily(
219+
self.build_name("spill_bytes"),
220+
"Total size of memory and disk accesses caused by managed data "
221+
"since the latest worker restart",
222+
labels=["activity"],
223+
),
224+
"count": CounterMetricFamily(
225+
self.build_name("spill_count"),
226+
"Total number of memory and disk accesses caused by managed data "
227+
"since the latest worker restart",
228+
labels=["activity"],
229+
),
230+
"seconds": CounterMetricFamily(
231+
self.build_name("spill_time"),
232+
"Total time spent spilling/unspilling since the latest worker restart",
233+
unit="seconds",
234+
labels=["activity"],
235+
),
236+
}
237+
268238
# Note: memory_read is used to calculate cache hit ratios (see docstring)
269-
for k in ("memory_read", "disk_read", "disk_write"):
270-
total_counts.add_metric([k], metrics[f"{k}_count_total"])
271-
yield total_counts
239+
for (label, unit), value in metrics.items():
240+
counters[unit].add_metric([label], value)
272241

273-
total_times = CounterMetricFamily(
274-
self.build_name("spill_time"),
275-
"Total time spent spilling/unspilling since the latest worker restart",
276-
unit="seconds",
277-
labels=["activity"],
278-
)
279-
for k in ("pickle", "disk_write", "disk_read", "unpickle"):
280-
total_times.add_metric([k], metrics[f"{k}_time_total"])
281-
yield total_times
242+
yield from counters.values()
282243

283244

284245
class PrometheusHandler(RequestHandler):

distributed/http/worker/tests/test_worker_http.py

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from tornado.httpclient import AsyncHTTPClient
88

99
from distributed import Event, Worker, wait
10+
from distributed.metrics import context_meter
1011
from distributed.sizeof import sizeof
1112
from distributed.utils_test import (
1213
async_wait_for,
@@ -25,13 +26,14 @@ async def test_prometheus(c, s, a):
2526
fut = c.submit(lambda: 1)
2627
await wait(fut)
2728

29+
with context_meter.no_threshold():
30+
a.data.evict()
31+
2832
active_metrics = await fetch_metrics_sample_names(
2933
a.http_server.port, prefix="dask_worker_"
3034
)
3135
expected_metrics = {
3236
"dask_worker_concurrent_fetch_requests",
33-
"dask_worker_event_loop_blocked_time_max_seconds",
34-
"dask_worker_event_loop_blocked_time_seconds_total",
3537
"dask_worker_latency_seconds",
3638
"dask_worker_memory_bytes",
3739
"dask_worker_spill_bytes_total",
@@ -247,37 +249,3 @@ def __sizeof__(self):
247249
assert 50 * 2**20 < metrics["managed"] < 100 * 2**30 # capped to process memory
248250
assert metrics["unmanaged"] == 0 # floored to 0
249251
assert metrics["spilled"] == 0
250-
251-
252-
@gen_cluster(
253-
client=True,
254-
nthreads=[("127.0.0.1", 1)],
255-
worker_kwargs={"memory_limit": "10 MiB"},
256-
config={
257-
"distributed.worker.memory.target": 1.0,
258-
"distributed.worker.memory.spill": False,
259-
"distributed.worker.memory.pause": False,
260-
},
261-
)
262-
async def test_prometheus_resets_max_metrics(c, s, a):
263-
pytest.importorskip("prometheus_client")
264-
np = pytest.importorskip("numpy")
265-
266-
# The first GET to /metrics calls collect() twice
267-
await fetch_metrics(a.http_server.port)
268-
269-
# We need substantial data to be sure that spilling it will take more than 5ms.
270-
x = c.submit(lambda: "x" * 40_000_000, key="x", workers=[a.address])
271-
await wait(x)
272-
# Key is individually larger than target threshold, so it was spilled immediately
273-
assert "x" in a.data.slow
274-
275-
nsecs = a.digests_max["disk-write-target-duration"]
276-
assert nsecs > 0
277-
278-
families = await fetch_metrics(a.http_server.port)
279-
metric = families["dask_worker_event_loop_blocked_time_max_seconds"]
280-
samples = {sample.labels["cause"]: sample.value for sample in metric.samples}
281-
282-
assert samples["disk-write-target"] == nsecs
283-
assert a.digests_max["disk-write-target-duration"] == 0

distributed/metrics.py

Lines changed: 174 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22

33
import collections
44
import time as timemod
5-
from collections.abc import Callable
5+
from collections.abc import Callable, Iterator
6+
from contextlib import contextmanager
7+
from contextvars import ContextVar
8+
from dataclasses import dataclass
69
from functools import wraps
10+
from math import nan
11+
from typing import Literal
712

813
import psutil
914

@@ -103,3 +108,171 @@ def resync(self) -> None:
103108
thread_time = timemod.thread_time
104109
except (AttributeError, OSError): # pragma: no cover
105110
thread_time = process_time
111+
112+
113+
@dataclass
114+
class MeterOutput:
115+
start: float
116+
stop: float
117+
delta: float
118+
__slots__ = tuple(__annotations__)
119+
120+
121+
@contextmanager
122+
def meter(
123+
func: Callable[[], float] = timemod.perf_counter,
124+
floor: float | Literal[False] = 0.0,
125+
) -> Iterator[MeterOutput]:
126+
"""Convenience context manager which calls func() before and after the wrapped
127+
code and calculates the delta.
128+
129+
Parameters
130+
----------
131+
label: str
132+
label to pass to the callback
133+
func: callable
134+
function to call before and after, which must return a number.
135+
Besides time, it could return e.g. cumulative network traffic or disk usage.
136+
Default: :func:`timemod.perf_counter`
137+
floor: float or False, optional
138+
Floor the delta to the given value (default: 0). This is useful for strictly
139+
cumulative functions that can occasionally glitch and go backwards.
140+
Set to False to disable.
141+
"""
142+
out = MeterOutput(func(), nan, nan)
143+
try:
144+
yield out
145+
finally:
146+
out.stop = func()
147+
out.delta = out.stop - out.start
148+
if floor is not False:
149+
out.delta = max(floor, out.delta)
150+
151+
152+
class ContextMeter:
153+
"""Context-based general purpose meter.
154+
155+
Usage
156+
-----
157+
1. In high level code, call :meth:`add_callback` to install a hook that defines an
158+
activity
159+
2. In low level code, typically many stack levels below, log quantitative events
160+
(e.g. elapsed time, transferred bytes, etc.) so that they will be attributed to
161+
the high-level code calling it, either with :meth:`meter` or
162+
:meth:`digest_metric`.
163+
164+
Examples
165+
--------
166+
In the code that e.g. sends a Python object from A to B over the network:
167+
>>> from distributed.metrics import context_meter
168+
>>> with context_meter.add_callback(partial(print, "A->B comms:")):
169+
... await send_over_the_network(obj)
170+
171+
In the serialization utilities, called many stack levels below:
172+
>>> with context_meter.meter("dumps"):
173+
... pik = pickle.dumps(obj)
174+
>>> with context_meter.meter("compress"):
175+
... pik = lz4.compress(pik)
176+
177+
And finally, elsewhere, deep into the TCP stack:
178+
>>> with context_meter.meter("network-write"):
179+
... await comm.write(frames)
180+
181+
When you call the top-level code, you'll get::
182+
A->B comms: dumps 0.012 seconds
183+
A->B comms: compress 0.034 seconds
184+
A->B comms: network-write 0.567 seconds
185+
"""
186+
187+
_callbacks: ContextVar[list[Callable[[str, float, str], None]]]
188+
default_threshold: float
189+
190+
def __init__(self):
191+
self._callbacks = ContextVar(f"MetricHook<{id(self)}>._callbacks", default=[])
192+
self.default_threshold = 0.001
193+
194+
@contextmanager
195+
def add_callback(
196+
self, callback: Callable[[str, float, str], None]
197+
) -> Iterator[None]:
198+
"""Add a callback when entering the context and remove it when exiting it.
199+
The callback must accept the same parameters as :meth:`digest_metric`.
200+
"""
201+
cbs = self._callbacks.get()
202+
tok = self._callbacks.set(cbs + [callback])
203+
try:
204+
yield
205+
finally:
206+
tok.var.reset(tok)
207+
208+
def digest_metric(self, label: str, value: float, unit: str) -> None:
209+
"""Invoke the currently set context callbacks for an arbitrary quantitative
210+
metric.
211+
"""
212+
cbs = self._callbacks.get()
213+
for cb in cbs:
214+
cb(label, value, unit)
215+
216+
@contextmanager
217+
def meter(
218+
self,
219+
label: str,
220+
unit: str = "seconds",
221+
func: Callable[[], float] = timemod.perf_counter,
222+
floor: float | Literal[False] = 0.0,
223+
threshold: float | None = None,
224+
) -> Iterator[None]:
225+
"""Convenience context manager which calls func() before and after the wrapped
226+
code, calculates the delta, and finally calls :meth:`digest_metric`. It also
227+
subtracts any other calls to :meth:`meter` or :meth:`digest_metric` with the
228+
same unit performed within the context, so that the total is strictly additive.
229+
230+
:meth:`digest_metric` is not called in case of exception.
231+
232+
Parameters
233+
----------
234+
label: str
235+
label to pass to the callback
236+
unit: str, optional
237+
unit to pass to the callback. Default: seconds
238+
func: callable
239+
see :func:`meter`
240+
floor: bool, optional
241+
see :func:`meter`
242+
threshold: float, optional
243+
Do not call :meth:`digest_metric` if the delta is less than this.
244+
Default: 1ms
245+
"""
246+
offsets = []
247+
248+
def cb(label2: str, value2: float, unit2: str) -> None:
249+
if unit2 == unit:
250+
# This must be threadsafe to support when callbacks are invoked from
251+
# distributed.utils.offload; '+=' on a float would not be threadsafe!
252+
offsets.append(value2)
253+
254+
with self.add_callback(cb), meter(func, floor=False) as m:
255+
yield
256+
257+
delta = m.delta - sum(offsets)
258+
if floor is not False:
259+
delta = max(floor, delta)
260+
if threshold is None:
261+
threshold = self.default_threshold
262+
if delta >= threshold:
263+
self.digest_metric(label, delta, unit)
264+
265+
@contextmanager
266+
def no_threshold(self) -> Iterator[None]:
267+
"""Temporarily disable default threshold in :meth:`meter()`.
268+
Useful for unit testing trivial timings.
269+
"""
270+
bak = self.default_threshold
271+
self.default_threshold = 0.0
272+
try:
273+
yield
274+
finally:
275+
self.default_threshold = bak
276+
277+
278+
context_meter = ContextMeter()

0 commit comments

Comments
 (0)