Skip to content

Commit 6d1dd32

Browse files
committed
Unit tests
1 parent 2060067 commit 6d1dd32

File tree

5 files changed

+305
-159
lines changed

5 files changed

+305
-159
lines changed

distributed/tests/test_reschedule.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ def test_cancelled_reschedule_worker_state(ws_with_running_task):
9191
assert ws.available_resources == {"R": 0}
9292

9393
instructions = ws.handle_stimulus(RescheduleEvent.dummy(key="x", stimulus_id="s2"))
94-
# There's no RescheduleMsg
95-
assert not instructions
94+
assert not instructions # There's no RescheduleMsg
9695
assert not ws.tasks # The task has been forgotten
9796
assert ws.available_resources == {"R": 1}
9897

distributed/tests/test_worker.py

Lines changed: 10 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import itertools
77
import logging
88
import os
9-
import secrets
109
import sys
1110
import tempfile
1211
import threading
@@ -17,7 +16,7 @@
1716
from concurrent.futures.process import BrokenProcessPool
1817
from numbers import Number
1918
from operator import add
20-
from time import perf_counter, sleep
19+
from time import sleep
2120
from unittest import mock
2221

2322
import psutil
@@ -43,7 +42,6 @@
4342
wait,
4443
)
4544
from distributed.comm.registry import backends
46-
from distributed.comm.utils import OFFLOAD_THRESHOLD
4745
from distributed.compatibility import LINUX, WINDOWS, to_thread
4846
from distributed.core import CommClosedError, Status, rpc
4947
from distributed.diagnostics import nvml
@@ -53,8 +51,8 @@
5351
PackageInstall,
5452
PipInstall,
5553
)
56-
from distributed.metrics import meter, time
57-
from distributed.protocol import default_compression, pickle
54+
from distributed.metrics import time
55+
from distributed.protocol import pickle
5856
from distributed.scheduler import KilledWorker, Scheduler
5957
from distributed.utils_test import (
6058
NO_AMM,
@@ -958,13 +956,11 @@ async def test_priorities(c, s, w):
958956
@gen_cluster(client=True)
959957
async def test_heartbeats(c, s, a, b):
960958
x = s.workers[a.address].last_seen
961-
with meter() as m:
962-
await asyncio.sleep(
963-
a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.1
964-
)
965-
while s.workers[a.address].last_seen == x:
966-
await asyncio.sleep(0.01)
967-
assert m.delta < 2
959+
start = time()
960+
await asyncio.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.1)
961+
while s.workers[a.address].last_seen == x:
962+
await asyncio.sleep(0.01)
963+
assert time() < start + 2
968964
assert a.periodic_callbacks["heartbeat"].callback_time < 1000
969965

970966

@@ -1252,10 +1248,10 @@ def f():
12521248
def func(dask_scheduler):
12531249
return list(dask_scheduler.clients)
12541250

1255-
start = perf_counter()
1251+
start = time()
12561252
while not any("worker" in n for n in client.run_on_scheduler(func)):
12571253
sleep(0.1)
1258-
assert perf_counter() < start + 10
1254+
assert time() < start + 10
12591255

12601256

12611257
@gen_cluster(nthreads=[], client=True)
@@ -3759,144 +3755,3 @@ def print_stderr(*args, **kwargs):
37593755

37603756
assert "" == out
37613757
assert "" == err
3762-
3763-
3764-
@gen_cluster(client=True, config={"distributed.worker.memory.target": 1e-9})
3765-
async def test_digest_metrics(c, s, a, b):
3766-
x = (await c.scatter({"x": "x" * 20_000}, workers=[a.address]))["x"]
3767-
y = (await c.scatter({"y": "y" * 20_000}, workers=[b.address]))["y"]
3768-
with meter() as m:
3769-
z = await c.submit("".join, [x, y], key="z", workers=[a.address])
3770-
assert z == "x" * 20_000 + "y" * 20_000
3771-
# The call to Worker.get_data will terminate after the fetch of z returns
3772-
await async_wait_for(
3773-
lambda: "get-data-network-seconds" in a.digests_total, timeout=5
3774-
)
3775-
3776-
digests = {
3777-
k: v
3778-
for k, v in a.digests_total.items()
3779-
if any(k.startswith(prefix) for prefix in ("gather-dep", "execute", "get-data"))
3780-
}
3781-
# import pprint; pprint.pprint(digests)
3782-
3783-
expect = [
3784-
# a.gather_dep(worker=b.address, keys=["z"])
3785-
"gather-dep-decompress-seconds",
3786-
"gather-dep-deserialize-seconds",
3787-
"gather-dep-network-seconds",
3788-
# Delta to end-to-end runtime as seen from the worker state machine
3789-
"gather-dep-other-seconds",
3790-
# Spill output; added by _transition_to_memory
3791-
"gather-dep-serialize-seconds",
3792-
"gather-dep-compress-seconds",
3793-
"gather-dep-disk-write-seconds",
3794-
"gather-dep-disk-write-count",
3795-
"gather-dep-disk-write-bytes",
3796-
# a.execute()
3797-
# -> Deserialize run_spec
3798-
"execute-deserialize-seconds",
3799-
# -> Unspill inputs
3800-
# (There's also another execute-deserialize-seconds entry)
3801-
"execute-disk-read-seconds",
3802-
"execute-disk-read-count",
3803-
"execute-disk-read-bytes",
3804-
"execute-decompress-seconds",
3805-
# -> Run in thread
3806-
"execute-thread-cpu-seconds",
3807-
"execute-thread-noncpu-seconds",
3808-
# Delta to end-to-end runtime as seen from the worker state machine
3809-
"execute-other-seconds",
3810-
# Spill output; added by _transition_to_memory
3811-
"execute-serialize-seconds",
3812-
"execute-compress-seconds",
3813-
"execute-disk-write-seconds",
3814-
"execute-disk-write-count",
3815-
"execute-disk-write-bytes",
3816-
# a.get_data() (triggered by the client retrieving the Future for z)
3817-
# Unspill
3818-
"get-data-disk-read-seconds",
3819-
"get-data-disk-read-count",
3820-
"get-data-disk-read-bytes",
3821-
"get-data-decompress-seconds",
3822-
"get-data-deserialize-seconds",
3823-
# Send over the network
3824-
"get-data-serialize-seconds",
3825-
"get-data-compress-seconds",
3826-
"get-data-network-seconds",
3827-
]
3828-
3829-
if not default_compression:
3830-
expect = [k for k in expect if k != "get-data-compress-seconds"]
3831-
assert list(digests) == expect
3832-
3833-
assert {k: v for k, v in digests.items() if k.endswith("-count")} == {
3834-
"execute-disk-read-count": 2,
3835-
"execute-disk-write-count": 1,
3836-
"gather-dep-disk-write-count": 1,
3837-
"get-data-disk-read-count": 1,
3838-
}
3839-
if not WINDOWS: # Fiddly rounding; see distributed.metrics._WindowsTime
3840-
assert sum(v for k, v in digests.items() if k.endswith("-seconds")) <= m.delta
3841-
3842-
3843-
@gen_cluster(client=True, nthreads=[("", 1)])
3844-
async def test_digest_metrics_async_task(c, s, a):
3845-
"""Test that async tasks are metered"""
3846-
await c.submit(asyncio.sleep, 0.1)
3847-
assert a.digests_total["execute-thread-cpu-seconds"] == 0
3848-
assert 0 < a.digests_total["execute-thread-noncpu-seconds"] < 0.5
3849-
3850-
3851-
@gen_cluster(client=True, nthreads=[("", 1)])
3852-
async def test_digest_metrics_run_spec_deserialization(c, s, a):
3853-
"""Test that deserialization of run_spec is metered"""
3854-
await c.submit(inc, 1, key="x")
3855-
assert a.digests_total["execute-deserialize-seconds"] > 0
3856-
3857-
3858-
@gen_cluster(client=True)
3859-
async def test_digest_metrics_offload(c, s, a, b):
3860-
"""Test that functions wrapped by offload() are metered"""
3861-
nbytes = int(OFFLOAD_THRESHOLD * 1.1)
3862-
x = c.submit(secrets.token_bytes, nbytes, key="x", workers=[a.address])
3863-
y = c.submit(lambda x: None, x, key="y", workers=[b.address])
3864-
await y
3865-
3866-
assert a.digests_total["get-data-serialize-seconds"] > 0
3867-
assert b.digests_total["gather-dep-deserialize-seconds"] > 0
3868-
3869-
3870-
@gen_cluster(client=True, nthreads=[("", 1)])
3871-
async def test_digest_metrics_failed_execute(c, s, a):
3872-
"""Tasks that failed to execute are metered as a separate lump total"""
3873-
x = c.submit(lambda: 1 / 0)
3874-
await wait(x)
3875-
3876-
digests = {k: v for k, v in a.digests_total.items() if k.startswith("execute")}
3877-
assert list(digests) == ["execute-failed-seconds"]
3878-
assert 0 < digests["execute-failed-seconds"] < 1
3879-
3880-
3881-
@gen_cluster(client=True, nthreads=[("", 1)])
3882-
async def test_digest_metrics_gather_dep_busy(c, s, a):
3883-
"""gather_dep() calls that failed because the remote peer is busy
3884-
are metered as a separate lump total
3885-
"""
3886-
raise NotImplementedError("TODO")
3887-
3888-
3889-
@gen_cluster(client=True, nthreads=[("", 1)])
3890-
async def test_digest_metrics_gather_dep_no_task(c, s, a):
3891-
"""gather_dep() calls where the remote peer answers that it doesn't have any of the
3892-
requested keys are metered as a separate lump total
3893-
"""
3894-
raise NotImplementedError("TODO")
3895-
3896-
3897-
@gen_cluster(client=True, nthreads=[("", 1)])
3898-
async def test_digest_metrics_gather_dep_network_error(c, s, a):
3899-
"""gather_dep() calls where the remote peer fails to respond are metered as a
3900-
separate lump total
3901-
"""
3902-
raise NotImplementedError("TODO")

0 commit comments

Comments
 (0)