Closed as not planned
Description
Seen in #7089
https://github.com/dask/distributed/actions/runs/3154045824/jobs/5132914151
____________________________ test_drop_with_waiter _____________________________
args = (), kwds = {}
@wraps(func)
def inner(*args, **kwds):
with self._recreate_cm():
> return func(*args, **kwds)
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
distributed/utils_test.py:1112: in test_func
return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:387: in _run_and_close_tornado
return asyncio.run(inner_fn())
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/runners.py:44: in run
return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:646: in run_until_complete
return future.result()
distributed/utils_test.py:384: in inner_fn
return await async_fn(*args, **kwargs)
distributed/utils_test.py:1109: in async_fn_outer
return await asyncio.wait_for(async_fn(), timeout=timeout * 2)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
async def async_fn():
result = None
with dask.config.set(config):
async with _cluster_factory() as (s, workers), _client_factory(
s
) as c:
args = [s] + workers
if c is not None:
args = [c] + args
try:
coro = func(*args, *outer_args, **kwargs)
task = asyncio.create_task(coro)
coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
result = await coro2
validate_state(s, *workers)
except asyncio.TimeoutError:
assert task
buffer = io.StringIO()
# This stack indicates where the coro/test is suspended
task.print_stack(file=buffer)
if cluster_dump_directory:
await dump_cluster_state(
s=s,
ws=workers,
output_dir=cluster_dump_directory,
func_name=func.__name__,
)
task.cancel()
while not task.cancelled():
await asyncio.sleep(0.01)
# Hopefully, the hang has been caused by inconsistent
# state, which should be much more meaningful than the
# timeout
validate_state(s, *workers)
# Remove as much of the traceback as possible; it's
# uninteresting boilerplate from utils_test and asyncio
# and not from the code being tested.
> raise asyncio.TimeoutError(
f"Test timeout after {timeout}s.\n"
"========== Test stack trace starts here ==========\n"
f"{buffer.getvalue()}"
) from None
E asyncio.exceptions.TimeoutError: Test timeout after 30s.
E ========== Test stack trace starts here ==========
E Stack for <Task pending name='Task-67488' coro=<test_drop_with_waiter() running at /Users/runner/work/distributed/distributed/distributed/tests/test_active_memory_manager.py:317> wait_for=<Future pending cb=[Task.task_wakeup()]>> (most recent call last):
E File "/Users/runner/work/distributed/distributed/distributed/tests/test_active_memory_manager.py", line 317, in test_drop_with_waiter
E await asyncio.sleep(0.01)
distributed/utils_test.py:1046: TimeoutError