Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions distributed/tests/test_tls_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@

import asyncio

import pytest
from tlz import merge

from distributed import Client, Nanny, Queue, Scheduler, Worker, wait, worker_client
from distributed.compatibility import LINUX
from distributed.core import Status
from distributed.metrics import time
from distributed.utils_test import (
Expand Down Expand Up @@ -91,7 +89,6 @@ async def test_scatter(c, s, a, b):
assert yy == [20]


@pytest.mark.skipif(LINUX, reason="https://github.com/dask/distributed/issues/9052")
@gen_tls_cluster(client=True, Worker=Nanny)
async def test_nanny(c, s, a, b):
assert s.address.startswith("tls://")
Expand Down Expand Up @@ -191,7 +188,6 @@ def mysum():
assert result == 30 * 29


@pytest.mark.skipif(LINUX, reason="https://github.com/dask/distributed/issues/9052")
@gen_tls_cluster(client=True, Worker=Nanny)
async def test_retire_workers(c, s, a, b):
assert set(s.workers) == {a.worker_address, b.worker_address}
Expand Down
18 changes: 10 additions & 8 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@
Worker: type[ServerNode] = Worker,
scheduler_kwargs: dict[str, Any] | None = None,
worker_kwargs: dict[str, Any] | None = None,
timeout: float = _TEST_TIMEOUT // 4,
) -> tuple[Scheduler, list[ServerNode]]:
scheduler_kwargs = scheduler_kwargs or {}
worker_kwargs = worker_kwargs or {}
Expand Down Expand Up @@ -797,13 +798,15 @@
or any(comm.comm is None for comm in s.stream_comms.values())
):
await asyncio.sleep(0.01)
if time() > start + 30:
if time() > start + timeout:
await asyncio.gather(*(w.close(timeout=1) for w in workers))
await s.close()
check_invalid_worker_transitions(s)
check_invalid_task_states(s)
check_worker_fail_hard(s)
raise TimeoutError("Cluster creation timeout")
raise TimeoutError(
"Cluster creation timeout. Workers did not come up and register in time."
)
return s, workers


Expand Down Expand Up @@ -969,26 +972,25 @@
workers = []
s = None
try:
for _ in range(60):
while True:
try:
if not deadline.remaining:
raise TimeoutError("Timeout on cluster creation")

Check warning on line 978 in distributed/utils_test.py

View check run for this annotation

Codecov / codecov/patch

distributed/utils_test.py#L978

Added line #L978 was not covered by tests
s, ws = await start_cluster(
nthreads,
scheduler,
security=security,
Worker=Worker,
scheduler_kwargs=scheduler_kwargs,
worker_kwargs=merge(
{"death_timeout": min(15, int(deadline.remaining))},
worker_kwargs,
),
worker_kwargs=worker_kwargs,
timeout=timeout // 4,
)
except Exception as e:
logger.error(
"Failed to start gen_cluster: "
f"{e.__class__.__name__}: {e}; retrying",
exc_info=True,
)
await asyncio.sleep(1)
else:
workers[:] = ws
break
Expand Down
Loading