Skip to content

Commit 51a3ba9

Browse files
committed
Merge branch 'main' into WSMR/clustered_transfers_tests
2 parents 8136bae + 344868a commit 51a3ba9

36 files changed

+3470
-2639
lines changed

distributed/active_memory_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,8 +416,9 @@ def run(
416416
) -> SuggestionGenerator:
417417
"""This method is invoked by the ActiveMemoryManager every few seconds, or
418418
whenever the user invokes ``client.amm.run_once``.
419+
419420
It is an iterator that must emit
420-
:class:`~distributed.active_memory_manager.Suggestion`s:
421+
:class:`~distributed.active_memory_manager.Suggestion` objects:
421422
422423
- ``Suggestion("replicate", <TaskState>)``
423424
- ``Suggestion("replicate", <TaskState>, {subset of potential workers to replicate to})``

distributed/cli/tests/test_dask_scheduler.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def test_no_dashboard(loop):
6666
def test_dashboard(loop):
6767
pytest.importorskip("bokeh")
6868

69-
with popen(["dask-scheduler"], flush_output=False) as proc:
69+
with popen(["dask-scheduler"], capture_output=True) as proc:
7070
line = wait_for_log_line(b"dashboard at", proc.stdout)
7171
dashboard_port = int(line.decode().split(":")[-1].strip())
7272

@@ -99,24 +99,22 @@ def test_dashboard_non_standard_ports(loop):
9999
pytest.importorskip("bokeh")
100100

101101
with popen(
102-
["dask-scheduler", "--port", "23448", "--dashboard-address", ":24832"],
103-
flush_output=False,
102+
["dask-scheduler", "--port", "3448", "--dashboard-address", ":4832"]
104103
) as proc:
105-
line = wait_for_log_line(b"dashboard at", proc.stdout)
106-
with Client("127.0.0.1:23448", loop=loop) as c:
104+
with Client("127.0.0.1:3448", loop=loop) as c:
107105
pass
108106

109107
start = time()
110108
while True:
111109
try:
112-
response = requests.get("http://localhost:24832/status/")
110+
response = requests.get("http://localhost:4832/status/")
113111
assert response.ok
114112
break
115113
except Exception:
116114
sleep(0.1)
117115
assert time() < start + 20
118116
with pytest.raises(Exception):
119-
requests.get("http://localhost:24832/status/")
117+
requests.get("http://localhost:4832/status/")
120118

121119

122120
@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@@ -209,10 +207,8 @@ def check_pidfile(proc, pidfile):
209207
def test_scheduler_port_zero(loop):
210208
with tmpfile() as fn:
211209
with popen(
212-
["dask-scheduler", "--no-dashboard", "--scheduler-file", fn, "--port", "0"],
213-
flush_output=False,
214-
) as proc:
215-
line = wait_for_log_line(b"dashboard at", proc.stdout)
210+
["dask-scheduler", "--no-dashboard", "--scheduler-file", fn, "--port", "0"]
211+
):
216212
with Client(scheduler_file=fn, loop=loop) as c:
217213
assert c.scheduler.port
218214
assert c.scheduler.port != 8786
@@ -222,7 +218,7 @@ def test_dashboard_port_zero(loop):
222218
pytest.importorskip("bokeh")
223219
with popen(
224220
["dask-scheduler", "--dashboard-address", ":0"],
225-
flush_output=False,
221+
capture_output=True,
226222
) as proc:
227223
line = wait_for_log_line(b"dashboard at", proc.stdout)
228224
dashboard_port = int(line.decode().split(":")[-1].strip())

distributed/cli/tests/test_dask_spec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,15 @@ def test_errors():
7878
"--spec-file",
7979
"foo.yaml",
8080
],
81-
flush_output=False,
81+
capture_output=True,
8282
) as proc:
8383
line = proc.stdout.readline().decode()
8484
assert "exactly one" in line
8585
assert "--spec" in line and "--spec-file" in line
8686

8787
with popen(
8888
[sys.executable, "-m", "distributed.cli.dask_spec"],
89-
flush_output=False,
89+
capture_output=True,
9090
) as proc:
9191
line = proc.stdout.readline().decode()
9292
assert "exactly one" in line

distributed/cli/tests/test_dask_ssh.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def test_version_option():
2323
def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
2424
with popen(
2525
["dask-ssh", "--nprocs=2", "--nohost", "localhost"],
26-
flush_output=False,
26+
capture_output=True,
2727
) as proc:
2828
with Client("tcp://127.0.0.1:8786", timeout="15 seconds", loop=loop) as c:
2929
c.wait_for_workers(2, timeout="15 seconds")
@@ -36,6 +36,6 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
3636
def test_ssh_cli_nworkers_with_nprocs_is_an_error():
3737
with popen(
3838
["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"],
39-
flush_output=False,
39+
capture_output=True,
4040
) as proc:
4141
wait_for_log_line(b"Both --nprocs and --nworkers", proc.stdout, max_lines=15)

distributed/cli/tests/test_dask_worker.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ async def test_nanny_worker_port_range_too_many_workers_raises(s):
244244
"9686:9687",
245245
"--no-dashboard",
246246
],
247-
flush_output=False,
247+
capture_output=True,
248248
) as worker:
249249
wait_for_log_line(b"Not enough ports in range", worker.stdout, max_lines=100)
250250

@@ -278,14 +278,14 @@ async def test_no_nanny(c, s):
278278
async def test_reconnect_deprecated(c, s):
279279
with popen(
280280
["dask-worker", s.address, "--reconnect"],
281-
flush_output=False,
281+
capture_output=True,
282282
) as worker:
283283
wait_for_log_line(b"`--reconnect` option has been removed", worker.stdout)
284284
assert worker.wait() == 1
285285

286286
with popen(
287287
["dask-worker", s.address, "--no-reconnect"],
288-
flush_output=False,
288+
capture_output=True,
289289
) as worker:
290290
wait_for_log_line(b"flag is deprecated, and will be removed", worker.stdout)
291291
await c.wait_for_workers(1)
@@ -361,7 +361,7 @@ def test_scheduler_address_env(loop, monkeypatch):
361361
async def test_nworkers_requires_nanny(s):
362362
with popen(
363363
["dask-worker", s.address, "--nworkers=2", "--no-nanny"],
364-
flush_output=False,
364+
capture_output=True,
365365
) as worker:
366366
wait_for_log_line(b"Failed to launch worker", worker.stdout, max_lines=15)
367367

@@ -400,7 +400,7 @@ async def test_nworkers_expands_name(c, s):
400400
async def test_worker_cli_nprocs_renamed_to_nworkers(c, s):
401401
with popen(
402402
["dask-worker", s.address, "--nprocs=2"],
403-
flush_output=False,
403+
capture_output=True,
404404
) as worker:
405405
await c.wait_for_workers(2)
406406
wait_for_log_line(b"renamed to --nworkers", worker.stdout, max_lines=15)
@@ -410,7 +410,7 @@ async def test_worker_cli_nprocs_renamed_to_nworkers(c, s):
410410
async def test_worker_cli_nworkers_with_nprocs_is_an_error(s):
411411
with popen(
412412
["dask-worker", s.address, "--nprocs=2", "--nworkers=2"],
413-
flush_output=False,
413+
capture_output=True,
414414
) as worker:
415415
wait_for_log_line(b"Both --nprocs and --nworkers", worker.stdout, max_lines=15)
416416

@@ -708,7 +708,7 @@ def test_error_during_startup(monkeypatch, nanny):
708708
"--port",
709709
scheduler_port,
710710
],
711-
flush_output=False,
711+
capture_output=True,
712712
) as scheduler:
713713
start = time()
714714
# Wait for the scheduler to be up

distributed/dashboard/components/scheduler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -984,8 +984,8 @@ def update(self):
984984
for ws in workers:
985985
x_read.append(ws.metrics["read_bytes"])
986986
x_write.append(ws.metrics["write_bytes"])
987-
x_read_disk.append(ws.metrics["read_bytes_disk"])
988-
x_write_disk.append(ws.metrics["write_bytes_disk"])
987+
x_read_disk.append(ws.metrics.get("read_bytes_disk", 0))
988+
x_write_disk.append(ws.metrics.get("write_bytes_disk", 0))
989989

990990
if self.scheduler.workers:
991991
self.bandwidth.x_range.end = max(
@@ -1173,8 +1173,8 @@ def get_data(self):
11731173
write_bytes += ws.metrics["write_bytes"]
11741174
cpu += ws.metrics["cpu"]
11751175
memory += ws.metrics["memory"]
1176-
read_bytes_disk += ws.metrics["read_bytes_disk"]
1177-
write_bytes_disk += ws.metrics["write_bytes_disk"]
1176+
read_bytes_disk += ws.metrics.get("read_bytes_disk", 0)
1177+
write_bytes_disk += ws.metrics.get("write_bytes_disk", 0)
11781178
time += ws.metrics["time"]
11791179

11801180
result = {

distributed/dashboard/tests/test_scheduler_bokeh.py

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from distributed.metrics import time
5151
from distributed.utils import format_dashboard_link
5252
from distributed.utils_test import dec, div, gen_cluster, get_cert, inc, slowinc
53+
from distributed.worker import Worker
5354

5455
# Imported from distributed.dashboard.utils
5556
scheduler.PROFILING = False # type: ignore
@@ -504,22 +505,33 @@ async def test_WorkerNetworkBandwidth(c, s, a, b):
504505
async def test_WorkerNetworkBandwidth_metrics(c, s, a, b):
505506
# Disable system monitor periodic callback to allow us to manually control
506507
# when it is called below
507-
a.periodic_callbacks["monitor"].stop()
508-
b.periodic_callbacks["monitor"].stop()
509-
510-
# Update worker system monitors and send updated metrics to the scheduler
511-
a.monitor.update()
512-
b.monitor.update()
513-
await asyncio.gather(a.heartbeat(), b.heartbeat())
514-
515-
nb = WorkerNetworkBandwidth(s)
516-
nb.update()
517-
518-
for idx, ws in enumerate(s.workers.values()):
519-
assert ws.metrics["read_bytes"] == nb.source.data["x_read"][idx]
520-
assert ws.metrics["write_bytes"] == nb.source.data["x_write"][idx]
521-
assert ws.metrics["read_bytes_disk"] == nb.source.data["x_read_disk"][idx]
522-
assert ws.metrics["write_bytes_disk"] == nb.source.data["x_write_disk"][idx]
508+
with dask.config.set({"distributed.admin.system-monitor.disk": False}):
509+
async with Worker(s.address) as w:
510+
a.periodic_callbacks["monitor"].stop()
511+
b.periodic_callbacks["monitor"].stop()
512+
w.periodic_callbacks["monitor"].stop()
513+
514+
# Update worker system monitors and send updated metrics to the scheduler
515+
a.monitor.update()
516+
b.monitor.update()
517+
w.monitor.update()
518+
await asyncio.gather(a.heartbeat(), b.heartbeat())
519+
await asyncio.gather(a.heartbeat(), b.heartbeat(), w.heartbeat())
520+
521+
nb = WorkerNetworkBandwidth(s)
522+
nb.update()
523+
524+
for idx, ws in enumerate(s.workers.values()):
525+
assert ws.metrics["read_bytes"] == nb.source.data["x_read"][idx]
526+
assert ws.metrics["write_bytes"] == nb.source.data["x_write"][idx]
527+
assert (
528+
ws.metrics.get("read_bytes_disk", 0)
529+
== nb.source.data["x_read_disk"][idx]
530+
)
531+
assert (
532+
ws.metrics.get("write_bytes_disk", 0)
533+
== nb.source.data["x_write_disk"][idx]
534+
)
523535

524536

525537
@gen_cluster(client=True)

distributed/deploy/tests/test_local.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import subprocess
33
import sys
4-
import unittest
54
from threading import Lock
65
from time import sleep
76
from urllib.parse import urlparse
@@ -15,7 +14,6 @@
1514
from distributed import Client, LocalCluster, Nanny, Worker, get_client
1615
from distributed.compatibility import LINUX
1716
from distributed.core import Status
18-
from distributed.deploy.utils_test import ClusterTest
1917
from distributed.metrics import time
2018
from distributed.system import MEMORY_LIMIT
2119
from distributed.utils import TimeoutError, sync
@@ -170,9 +168,49 @@ def test_transports_tcp_port():
170168
assert e.submit(inc, 4).result() == 5
171169

172170

173-
class LocalTest(ClusterTest, unittest.TestCase):
174-
Cluster = LocalCluster # type: ignore
175-
kwargs = {"silence_logs": False, "dashboard_address": ":0", "processes": False}
171+
def test_cores(loop):
172+
with LocalCluster(
173+
n_workers=2,
174+
scheduler_port=0,
175+
silence_logs=False,
176+
dashboard_address=":0",
177+
processes=False,
178+
loop=loop,
179+
) as cluster, Client(cluster.scheduler_address, loop=loop) as client:
180+
client.scheduler_info()
181+
assert len(client.nthreads()) == 2
182+
183+
184+
def test_submit(loop):
185+
with LocalCluster(
186+
n_workers=2,
187+
scheduler_port=0,
188+
silence_logs=False,
189+
dashboard_address=":0",
190+
processes=False,
191+
loop=loop,
192+
) as cluster, Client(cluster.scheduler_address, loop=loop) as client:
193+
future = client.submit(lambda x: x + 1, 1)
194+
assert future.result() == 2
195+
196+
197+
def test_context_manager(loop):
198+
with LocalCluster(
199+
silence_logs=False, dashboard_address=":0", processes=False, loop=loop
200+
) as c, Client(c) as e:
201+
assert e.nthreads()
202+
203+
204+
def test_no_workers_sync(loop):
205+
with LocalCluster(
206+
n_workers=0,
207+
scheduler_port=0,
208+
silence_logs=False,
209+
dashboard_address=":0",
210+
processes=False,
211+
loop=loop,
212+
):
213+
pass
176214

177215

178216
def test_Client_with_local(loop):

distributed/deploy/utils_test.py

Lines changed: 0 additions & 38 deletions
This file was deleted.

distributed/diagnostics/plugin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ def __init__(self, filepath):
334334

335335
async def setup(self, worker):
336336
response = await worker.upload_file(
337-
comm=None, filename=self.filename, data=self.data, load=True
337+
filename=self.filename, data=self.data, load=True
338338
)
339339
assert len(self.data) == response["nbytes"]
340340

0 commit comments

Comments
 (0)