Skip to content

Commit

Permalink
Provide stack for suspended coro in test timeout (#5446)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Oct 21, 2021
1 parent 918e3fb commit 33d83bc
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 26 deletions.
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def func(scheduler):
@gen_cluster(
nthreads=[], config={"distributed.scheduler.blocked-handlers": ["test-handler"]}
)
def test_scheduler_init_pulls_blocked_handlers_from_config(s):
async def test_scheduler_init_pulls_blocked_handlers_from_config(s):
assert s.blocked_handlers == ["test-handler"]


Expand Down
42 changes: 21 additions & 21 deletions distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,6 @@ def assert_config():
await c.run_on_scheduler(assert_config)


@gen_cluster(client=True)
def test_gen_cluster_legacy_implicit(c, s, a, b):
assert isinstance(c, Client)
assert isinstance(s, Scheduler)
for w in [a, b]:
assert isinstance(w, Worker)
assert s.nthreads == {w.address: w.nthreads for w in [a, b]}
assert (yield c.submit(lambda: 123)) == 123


@gen_cluster(client=True)
@gen.coroutine
def test_gen_cluster_legacy_explicit(c, s, a, b):
assert isinstance(c, Client)
assert isinstance(s, Scheduler)
for w in [a, b]:
assert isinstance(w, Worker)
assert s.nthreads == {w.address: w.nthreads for w in [a, b]}
assert (yield c.submit(lambda: 123)) == 123


@pytest.mark.skip(reason="This hangs on travis")
def test_gen_cluster_cleans_up_client(loop):
import dask.context
Expand Down Expand Up @@ -373,3 +352,24 @@ async def ping_pong():
assert await write_queue.get() == (b.address, {"op": "ping", "reply": True})
write_event.set()
assert await fut == "pong"


@pytest.mark.slow()
def test_provide_stack_on_timeout():
sleep_time = 30

async def inner_test(c, s, a, b):
await asyncio.sleep(sleep_time)

# If this timeout is too small, the cluster setup/teardown might take too
# long and the timeout error we'll receive will be different
test = gen_cluster(client=True, timeout=2)(inner_test)

start = time()
with pytest.raises(asyncio.TimeoutError) as exc:
test()
end = time()
assert "inner_test" in str(exc)
assert "await asyncio.sleep(sleep_time)" in str(exc)
# ensure the task was properly
assert end - start < sleep_time / 2
21 changes: 17 additions & 4 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture

def _(func):
if not iscoroutinefunction(func):
func = gen.coroutine(func)
raise RuntimeError("gen_cluster only works for coroutine functions.")

@functools.wraps(func)
def test_func(*outer_args, **kwargs):
Expand Down Expand Up @@ -964,11 +964,24 @@ async def coro():
)
args = [c] + args
try:
future = func(*args, *outer_args, **kwargs)
future = asyncio.wait_for(future, timeout)
result = await future
coro = func(*args, *outer_args, **kwargs)
task = asyncio.create_task(coro)

coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
result = await coro2
if s.validate:
s.validate_state()
except asyncio.TimeoutError as e:
assert task
buffer = io.StringIO()
# This stack indicates where the coro/test is suspended
task.print_stack(file=buffer)
task.cancel()
while not task.cancelled():
await asyncio.sleep(0.01)
raise TimeoutError(
f"Test timeout after {timeout}s.\n{buffer.getvalue()}"
) from e
finally:
if client and c.status not in ("closing", "closed"):
await c._close(fast=s.status == Status.closed)
Expand Down

0 comments on commit 33d83bc

Please sign in to comment.