Skip to content

Commit a09a151

Browse files
authored
Fine performance metrics for execute, gather_dep, etc. (#7586)
1 parent e9fb7ad commit a09a151

23 files changed

+1329
-554
lines changed

distributed/core.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
import warnings
1212
import weakref
1313
from collections import defaultdict, deque
14-
from collections.abc import Callable, Container, Coroutine, Generator
14+
from collections.abc import Callable, Container, Coroutine, Generator, Hashable
1515
from enum import Enum
16+
from functools import wraps
1617
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, TypeVar, final
1718

1819
import tblib
@@ -34,7 +35,7 @@
3435
)
3536
from distributed.compatibility import PeriodicCallback
3637
from distributed.counter import Counter
37-
from distributed.metrics import time
38+
from distributed.metrics import context_meter, time
3839
from distributed.system_monitor import SystemMonitor
3940
from distributed.utils import (
4041
NoOpAwaitable,
@@ -950,7 +951,7 @@ async def close(self, timeout=None):
950951
finally:
951952
self._event_finished.set()
952953

953-
def digest_metric(self, name: str, value: float) -> None:
954+
def digest_metric(self, name: Hashable, value: float) -> None:
954955
# Granular data (requires crick)
955956
if self.digests is not None:
956957
self.digests[name].add(value)
@@ -960,6 +961,33 @@ def digest_metric(self, name: str, value: float) -> None:
960961
self.digests_max[name] = max(self.digests_max[name], value)
961962

962963

964+
def context_meter_to_server_digest(digest_tag: str) -> Callable:
965+
"""Decorator for an async method of a Server subclass that calls
966+
``distributed.metrics.context_meter.meter`` and/or ``digest_metric``.
967+
It routes the calls from ``context_meter.digest_metric(label, value, unit)`` to
968+
``Server.digest_metric((digest_tag, label, unit), value)``.
969+
"""
970+
971+
def decorator(func: Callable) -> Callable:
972+
@wraps(func)
973+
async def wrapper(self: Server, *args: Any, **kwargs: Any) -> Any:
974+
loop = asyncio.get_running_loop()
975+
976+
def metrics_callback(label: Hashable, value: float, unit: str) -> None:
977+
if not isinstance(label, tuple):
978+
label = (label,)
979+
name = (digest_tag, *label, unit)
980+
# This callback could be called from another thread through offload()
981+
loop.call_soon_threadsafe(self.digest_metric, name, value)
982+
983+
with context_meter.add_callback(metrics_callback):
984+
return await func(self, *args, **kwargs)
985+
986+
return wrapper
987+
988+
return decorator
989+
990+
963991
def pingpong(comm):
964992
return b"pong"
965993

distributed/http/tests/test_core.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ async def test_prometheus_api_doc(c, s, a):
2626
# Some metrics only appear if there are tasks on the cluster
2727
fut = c.submit(inc, 1)
2828
await fut
29+
30+
a.data.evict()
31+
2932
# Semaphore metrics only appear after semaphores are used
3033
sem = await Semaphore()
3134
await sem.acquire()

distributed/http/worker/prometheus/core.py

Lines changed: 26 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def collect(self) -> Iterator[Metric]:
143143

144144
now = time()
145145
max_tick_duration = max(
146-
self.server.digests_max["tick_duration"],
146+
self.server.digests_max.pop("tick_duration", 0),
147147
now - self.server._last_tick,
148148
)
149149
yield GaugeMetricFamily(
@@ -152,46 +152,12 @@ def collect(self) -> Iterator[Metric]:
152152
unit="seconds",
153153
value=max_tick_duration,
154154
)
155-
156155
yield CounterMetricFamily(
157156
self.build_name("tick_count"),
158157
"Total number of ticks observed since the server started",
159158
value=self.server._tick_counter,
160159
)
161160

162-
# This duplicates spill_time_total; however the breakdown is different
163-
evloop_blocked_total = CounterMetricFamily(
164-
self.build_name("event_loop_blocked_time"),
165-
"Total time during which the worker's event loop was blocked "
166-
"by spill/unspill activity since the latest worker reset",
167-
unit="seconds",
168-
labels=["cause"],
169-
)
170-
# This is typically higher than spill_time_per_key_max, as multiple keys can be
171-
# spilled/unspilled without yielding the event loop
172-
evloop_blocked_max = GaugeMetricFamily(
173-
self.build_name("event_loop_blocked_time_max"),
174-
"Maximum contiguous time during which the worker's event loop was blocked "
175-
"by spill/unspill activity since the previous Prometheus poll",
176-
unit="seconds",
177-
labels=["cause"],
178-
)
179-
for family, digest in (
180-
(evloop_blocked_total, self.server.digests_total),
181-
(evloop_blocked_max, self.server.digests_max),
182-
):
183-
for family_label, digest_label in (
184-
("disk-write-target", "disk-write-target-duration"),
185-
("disk-write-spill", "disk-write-spill-duration"),
186-
("disk-read-execute", "disk-load-duration"),
187-
("disk-read-get-data", "get-data-load-duration"),
188-
):
189-
family.add_metric([family_label], digest[digest_label])
190-
191-
yield evloop_blocked_total
192-
yield evloop_blocked_max
193-
self.server.digests_max.clear()
194-
195161
def collect_crick(self) -> Iterator[Metric]:
196162
# All metrics using digests require crick to be installed.
197163
# The following metrics will export NaN, if the corresponding digests are None
@@ -243,42 +209,36 @@ def collect_spillbuffer(self) -> Iterator[Metric]:
243209
read = spill_bytes.disk_read / spill_time.disk_read
244210
"""
245211
try:
246-
get_metrics = self.server.data.get_metrics # type: ignore
212+
metrics = self.server.data.cumulative_metrics # type: ignore
247213
except AttributeError:
248214
return # spilling is disabled
249-
metrics = get_metrics()
250215

251-
total_bytes = CounterMetricFamily(
252-
self.build_name("spill_bytes"),
253-
"Total size of memory and disk accesses caused by managed data "
254-
"since the latest worker restart",
255-
labels=["activity"],
256-
)
257-
# Note: memory_read is used to calculate cache hit ratios (see docstring)
258-
for k in ("memory_read", "disk_read", "disk_write"):
259-
total_bytes.add_metric([k], metrics[f"{k}_bytes_total"])
260-
yield total_bytes
261-
262-
total_counts = CounterMetricFamily(
263-
self.build_name("spill_count"),
264-
"Total number of memory and disk accesses caused by managed data "
265-
"since the latest worker restart",
266-
labels=["activity"],
267-
)
216+
counters = {
217+
"bytes": CounterMetricFamily(
218+
self.build_name("spill_bytes"),
219+
"Total size of memory and disk accesses caused by managed data "
220+
"since the latest worker restart",
221+
labels=["activity"],
222+
),
223+
"count": CounterMetricFamily(
224+
self.build_name("spill_count"),
225+
"Total number of memory and disk accesses caused by managed data "
226+
"since the latest worker restart",
227+
labels=["activity"],
228+
),
229+
"seconds": CounterMetricFamily(
230+
self.build_name("spill_time"),
231+
"Total time spent spilling/unspilling since the latest worker restart",
232+
unit="seconds",
233+
labels=["activity"],
234+
),
235+
}
236+
268237
# Note: memory_read is used to calculate cache hit ratios (see docstring)
269-
for k in ("memory_read", "disk_read", "disk_write"):
270-
total_counts.add_metric([k], metrics[f"{k}_count_total"])
271-
yield total_counts
238+
for (label, unit), value in metrics.items():
239+
counters[unit].add_metric([label], value)
272240

273-
total_times = CounterMetricFamily(
274-
self.build_name("spill_time"),
275-
"Total time spent spilling/unspilling since the latest worker restart",
276-
unit="seconds",
277-
labels=["activity"],
278-
)
279-
for k in ("pickle", "disk_write", "disk_read", "unpickle"):
280-
total_times.add_metric([k], metrics[f"{k}_time_total"])
281-
yield total_times
241+
yield from counters.values()
282242

283243

284244
class PrometheusHandler(RequestHandler):

distributed/http/worker/tests/test_worker_http.py

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ async def test_prometheus(c, s, a):
2525
fut = c.submit(lambda: 1)
2626
await wait(fut)
2727

28+
a.data.evict()
29+
2830
active_metrics = await fetch_metrics_sample_names(
2931
a.http_server.port, prefix="dask_worker_"
3032
)
3133
expected_metrics = {
3234
"dask_worker_concurrent_fetch_requests",
33-
"dask_worker_event_loop_blocked_time_max_seconds",
34-
"dask_worker_event_loop_blocked_time_seconds_total",
3535
"dask_worker_latency_seconds",
3636
"dask_worker_memory_bytes",
3737
"dask_worker_spill_bytes_total",
@@ -247,37 +247,3 @@ def __sizeof__(self):
247247
assert 50 * 2**20 < metrics["managed"] < 100 * 2**30 # capped to process memory
248248
assert metrics["unmanaged"] == 0 # floored to 0
249249
assert metrics["spilled"] == 0
250-
251-
252-
@gen_cluster(
253-
client=True,
254-
nthreads=[("127.0.0.1", 1)],
255-
worker_kwargs={"memory_limit": "10 MiB"},
256-
config={
257-
"distributed.worker.memory.target": 1.0,
258-
"distributed.worker.memory.spill": False,
259-
"distributed.worker.memory.pause": False,
260-
},
261-
)
262-
async def test_prometheus_resets_max_metrics(c, s, a):
263-
pytest.importorskip("prometheus_client")
264-
np = pytest.importorskip("numpy")
265-
266-
# The first GET to /metrics calls collect() twice
267-
await fetch_metrics(a.http_server.port)
268-
269-
# We need substantial data to be sure that spilling it will take more than 5ms.
270-
x = c.submit(lambda: "x" * 40_000_000, key="x", workers=[a.address])
271-
await wait(x)
272-
# Key is individually larger than target threshold, so it was spilled immediately
273-
assert "x" in a.data.slow
274-
275-
nsecs = a.digests_max["disk-write-target-duration"]
276-
assert nsecs > 0
277-
278-
families = await fetch_metrics(a.http_server.port)
279-
metric = families["dask_worker_event_loop_blocked_time_max_seconds"]
280-
samples = {sample.labels["cause"]: sample.value for sample in metric.samples}
281-
282-
assert samples["disk-write-target"] == nsecs
283-
assert a.digests_max["disk-write-target-duration"] == 0

0 commit comments

Comments
 (0)