Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
bd69ea6
WIP
crusaderky Feb 27, 2023
2607de0
@meter decorator
crusaderky Feb 28, 2023
f7ea372
MeteredEvent.meter
crusaderky Feb 28, 2023
9725343
Decorator for @get_data
crusaderky Feb 28, 2023
e05d68b
nits
crusaderky Feb 28, 2023
4727b9c
remove DigestMetric 1/2
crusaderky Feb 28, 2023
29f1db6
assert_instructions
crusaderky Feb 28, 2023
e3b2cdb
Revert "assert_instructions"
crusaderky Feb 28, 2023
4d603c1
Remove DigestMetric 2/2
crusaderky Feb 28, 2023
e6ed0f7
Remove DigestMetric 3
crusaderky Feb 28, 2023
57acfd6
Do not unspill on free-keys
crusaderky Mar 1, 2023
6499bae
Unit tests
crusaderky Mar 1, 2023
01cfa4e
Fix regressions
crusaderky Mar 2, 2023
70b7ca7
Reduce diff
crusaderky Mar 2, 2023
0dfbb7d
doc
crusaderky Mar 2, 2023
ccd0c51
memory monitor
crusaderky Mar 2, 2023
52e4b04
Fix test failure
crusaderky Mar 2, 2023
50c77da
fix test
crusaderky Mar 2, 2023
8234594
#6705
crusaderky Mar 2, 2023
301d49f
Fix work stealing
crusaderky Mar 2, 2023
1ba5f7c
User metrics
crusaderky Mar 2, 2023
58e2a32
Fix
crusaderky Mar 2, 2023
7561ca8
Merge branch 'main' into gather-dep
crusaderky Mar 2, 2023
d740e50
@contextmanager can be used as a decorator!
crusaderky Mar 2, 2023
d60bbe0
Remove redundant time measures
crusaderky Mar 2, 2023
d3749fc
trivial
crusaderky Mar 2, 2023
4fcb637
tuple digests; exclude crick
crusaderky Mar 2, 2023
6335a7e
Retain task prefix for execute
crusaderky Mar 2, 2023
3c884a8
Remove crick hack
crusaderky Mar 3, 2023
dc8d038
Only one crick callback
crusaderky Mar 3, 2023
8ccd3a2
Merge branch 'main' into gather-dep
crusaderky Mar 3, 2023
2e72606
Simplify memory-monitor
crusaderky Mar 3, 2023
4dac982
Fix bug in memory monitor
crusaderky Mar 3, 2023
0b73bfa
Self-review
crusaderky Mar 3, 2023
ecb6bc7
Remove in-thread add_callback
crusaderky Mar 6, 2023
a039baf
Drastically reduce diffs; encapsulate WorkerStateMachine changes
crusaderky Mar 6, 2023
36db22b
Merge branch 'main' into gather-dep
crusaderky Mar 6, 2023
821e05c
Fix regression
crusaderky Mar 7, 2023
05b8c0b
trivial
crusaderky Mar 7, 2023
2eae195
Fix documentation
crusaderky Mar 7, 2023
66bcaeb
Merge branch 'main' into gather-dep
crusaderky Mar 9, 2023
3f2e590
Merge branch 'main' into gather-dep
crusaderky Mar 9, 2023
03f52ba
trivial after merge with main
crusaderky Mar 9, 2023
bde5f9a
Merge branch 'main' into gather-dep
crusaderky Mar 13, 2023
ad45e4a
Do not reuse offload() for wrapping generic executor
crusaderky Mar 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import warnings
import weakref
from collections import defaultdict, deque
from collections.abc import Callable, Container, Coroutine, Generator
from collections.abc import Callable, Container, Coroutine, Generator, Hashable
from enum import Enum
from functools import wraps
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, TypeVar, final

import tblib
Expand All @@ -34,7 +35,7 @@
)
from distributed.compatibility import PeriodicCallback
from distributed.counter import Counter
from distributed.metrics import time
from distributed.metrics import context_meter, time
from distributed.system_monitor import SystemMonitor
from distributed.utils import (
NoOpAwaitable,
Expand Down Expand Up @@ -950,7 +951,7 @@ async def close(self, timeout=None):
finally:
self._event_finished.set()

def digest_metric(self, name: str, value: float) -> None:
def digest_metric(self, name: Hashable, value: float) -> None:
# Granular data (requires crick)
if self.digests is not None:
self.digests[name].add(value)
Expand All @@ -960,6 +961,33 @@ def digest_metric(self, name: str, value: float) -> None:
self.digests_max[name] = max(self.digests_max[name], value)


def context_meter_to_server_digest(digest_tag: str) -> Callable:
"""Decorator for an async method of a Server subclass that calls
``distributed.metrics.context_meter.meter`` and/or ``digest_metric``.
It routes the calls from ``context_meter.digest_metric(label, value, unit)`` to
``Server.digest_metric((digest_tag, label, unit), value)``.
"""

def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(self: Server, *args: Any, **kwargs: Any) -> Any:
loop = asyncio.get_running_loop()

def metrics_callback(label: Hashable, value: float, unit: str) -> None:
if not isinstance(label, tuple):
label = (label,)
name = (digest_tag, *label, unit)
# This callback could be called from another thread through offload()
loop.call_soon_threadsafe(self.digest_metric, name, value)

with context_meter.add_callback(metrics_callback):
return await func(self, *args, **kwargs)

return wrapper

return decorator


def pingpong(comm):
return b"pong"

Expand Down
3 changes: 3 additions & 0 deletions distributed/http/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ async def test_prometheus_api_doc(c, s, a):
# Some metrics only appear if there are tasks on the cluster
fut = c.submit(inc, 1)
await fut

a.data.evict()

# Semaphore metrics only appear after semaphores are used
sem = await Semaphore()
await sem.acquire()
Expand Down
92 changes: 26 additions & 66 deletions distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def collect(self) -> Iterator[Metric]:

now = time()
max_tick_duration = max(
self.server.digests_max["tick_duration"],
self.server.digests_max.pop("tick_duration", 0),
now - self.server._last_tick,
)
yield GaugeMetricFamily(
Expand All @@ -152,46 +152,12 @@ def collect(self) -> Iterator[Metric]:
unit="seconds",
value=max_tick_duration,
)

yield CounterMetricFamily(
self.build_name("tick_count"),
"Total number of ticks observed since the server started",
value=self.server._tick_counter,
)

# This duplicates spill_time_total; however the breakdown is different
evloop_blocked_total = CounterMetricFamily(
self.build_name("event_loop_blocked_time"),
"Total time during which the worker's event loop was blocked "
"by spill/unspill activity since the latest worker reset",
unit="seconds",
labels=["cause"],
)
# This is typically higher than spill_time_per_key_max, as multiple keys can be
# spilled/unspilled without yielding the event loop
evloop_blocked_max = GaugeMetricFamily(
self.build_name("event_loop_blocked_time_max"),
"Maximum contiguous time during which the worker's event loop was blocked "
"by spill/unspill activity since the previous Prometheus poll",
unit="seconds",
labels=["cause"],
)
for family, digest in (
(evloop_blocked_total, self.server.digests_total),
(evloop_blocked_max, self.server.digests_max),
):
for family_label, digest_label in (
("disk-write-target", "disk-write-target-duration"),
("disk-write-spill", "disk-write-spill-duration"),
("disk-read-execute", "disk-load-duration"),
("disk-read-get-data", "get-data-load-duration"),
):
family.add_metric([family_label], digest[digest_label])

yield evloop_blocked_total
yield evloop_blocked_max
self.server.digests_max.clear()

def collect_crick(self) -> Iterator[Metric]:
# All metrics using digests require crick to be installed.
# The following metrics will export NaN, if the corresponding digests are None
Expand Down Expand Up @@ -243,42 +209,36 @@ def collect_spillbuffer(self) -> Iterator[Metric]:
read = spill_bytes.disk_read / spill_time.disk_read
"""
try:
get_metrics = self.server.data.get_metrics # type: ignore
metrics = self.server.data.cumulative_metrics # type: ignore
except AttributeError:
return # spilling is disabled
metrics = get_metrics()

total_bytes = CounterMetricFamily(
self.build_name("spill_bytes"),
"Total size of memory and disk accesses caused by managed data "
"since the latest worker restart",
labels=["activity"],
)
# Note: memory_read is used to calculate cache hit ratios (see docstring)
for k in ("memory_read", "disk_read", "disk_write"):
total_bytes.add_metric([k], metrics[f"{k}_bytes_total"])
yield total_bytes

total_counts = CounterMetricFamily(
self.build_name("spill_count"),
"Total number of memory and disk accesses caused by managed data "
"since the latest worker restart",
labels=["activity"],
)
counters = {
"bytes": CounterMetricFamily(
self.build_name("spill_bytes"),
"Total size of memory and disk accesses caused by managed data "
"since the latest worker restart",
labels=["activity"],
),
"count": CounterMetricFamily(
self.build_name("spill_count"),
"Total number of memory and disk accesses caused by managed data "
"since the latest worker restart",
labels=["activity"],
),
"seconds": CounterMetricFamily(
self.build_name("spill_time"),
"Total time spent spilling/unspilling since the latest worker restart",
unit="seconds",
labels=["activity"],
),
}

# Note: memory_read is used to calculate cache hit ratios (see docstring)
for k in ("memory_read", "disk_read", "disk_write"):
total_counts.add_metric([k], metrics[f"{k}_count_total"])
yield total_counts
for (label, unit), value in metrics.items():
counters[unit].add_metric([label], value)

total_times = CounterMetricFamily(
self.build_name("spill_time"),
"Total time spent spilling/unspilling since the latest worker restart",
unit="seconds",
labels=["activity"],
)
for k in ("pickle", "disk_write", "disk_read", "unpickle"):
total_times.add_metric([k], metrics[f"{k}_time_total"])
yield total_times
yield from counters.values()


class PrometheusHandler(RequestHandler):
Expand Down
38 changes: 2 additions & 36 deletions distributed/http/worker/tests/test_worker_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ async def test_prometheus(c, s, a):
fut = c.submit(lambda: 1)
await wait(fut)

a.data.evict()

active_metrics = await fetch_metrics_sample_names(
a.http_server.port, prefix="dask_worker_"
)
expected_metrics = {
"dask_worker_concurrent_fetch_requests",
"dask_worker_event_loop_blocked_time_max_seconds",
"dask_worker_event_loop_blocked_time_seconds_total",
"dask_worker_latency_seconds",
"dask_worker_memory_bytes",
"dask_worker_spill_bytes_total",
Expand Down Expand Up @@ -247,37 +247,3 @@ def __sizeof__(self):
assert 50 * 2**20 < metrics["managed"] < 100 * 2**30 # capped to process memory
assert metrics["unmanaged"] == 0 # floored to 0
assert metrics["spilled"] == 0


@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)],
worker_kwargs={"memory_limit": "10 MiB"},
config={
"distributed.worker.memory.target": 1.0,
"distributed.worker.memory.spill": False,
"distributed.worker.memory.pause": False,
},
)
async def test_prometheus_resets_max_metrics(c, s, a):
pytest.importorskip("prometheus_client")
np = pytest.importorskip("numpy")

# The first GET to /metrics calls collect() twice
await fetch_metrics(a.http_server.port)

# We need substantial data to be sure that spilling it will take more than 5ms.
x = c.submit(lambda: "x" * 40_000_000, key="x", workers=[a.address])
await wait(x)
# Key is individually larger than target threshold, so it was spilled immediately
assert "x" in a.data.slow

nsecs = a.digests_max["disk-write-target-duration"]
assert nsecs > 0

families = await fetch_metrics(a.http_server.port)
metric = families["dask_worker_event_loop_blocked_time_max_seconds"]
samples = {sample.labels["cause"]: sample.value for sample in metric.samples}

assert samples["disk-write-target"] == nsecs
assert a.digests_max["disk-write-target-duration"] == 0
Loading