-
-
Notifications
You must be signed in to change notification settings - Fork 750
Closed
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.
Description
distributed/tests/test_worker.py::test_worker_reconnects_mid_compute_multiple_states_on_scheduler has recently started sporadically failing with an asyncio.TimeoutError. See, for example, this CI build on main
Full traceback:
=================================== FAILURES ===================================
_______ test_worker_reconnects_mid_compute_multiple_states_on_scheduler ________
outer_args = (), kwargs = {}, result = None
coro = <function gen_cluster.<locals>._.<locals>.test_func.<locals>.coro at 0x7f4f961a7710>
@functools.wraps(func)
def test_func(*outer_args, **kwargs):
result = None
workers = []
with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
async def coro():
with dask.config.set(config):
s = False
for _ in range(60):
try:
s, ws = await start_cluster(
nthreads,
scheduler,
loop,
security=security,
Worker=Worker,
scheduler_kwargs=scheduler_kwargs,
worker_kwargs=worker_kwargs,
)
except Exception as e:
logger.error(
"Failed to start gen_cluster: "
f"{e.__class__.__name__}: {e}; retrying",
exc_info=True,
)
await asyncio.sleep(1)
else:
workers[:] = ws
args = [s] + workers
break
if s is False:
raise Exception("Could not start cluster")
if client:
c = await Client(
s.address,
loop=loop,
security=security,
asynchronous=True,
**client_kwargs,
)
args = [c] + args
try:
future = func(*args, *outer_args, **kwargs)
future = asyncio.wait_for(future, timeout)
result = await future
if s.validate:
s.validate_state()
finally:
if client and c.status not in ("closing", "closed"):
await c._close(fast=s.status == Status.closed)
await end_cluster(s, workers)
await asyncio.wait_for(cleanup_global_workers(), 1)
try:
c = await default_client()
except ValueError:
pass
else:
await c._close(fast=True)
def get_unclosed():
return [c for c in Comm._instances if not c.closed()] + [
c
for c in _global_clients.values()
if c.status != "closed"
]
try:
start = time()
while time() < start + 60:
gc.collect()
if not get_unclosed():
break
await asyncio.sleep(0.05)
else:
if allow_unclosed:
print(f"Unclosed Comms: {get_unclosed()}")
else:
raise RuntimeError("Unclosed Comms", get_unclosed())
finally:
Comm._instances.clear()
_global_clients.clear()
return result
result = loop.run_sync(
> coro, timeout=timeout * 2 if timeout else timeout
)
distributed/utils_test.py:995:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/share/miniconda3/envs/dask-distributed/lib/python3.7/site-packages/tornado/ioloop.py:576: in run_sync
return future_cell[0].result()
distributed/utils_test.py:953: in coro
result = await future
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fut = <Task cancelled coro=<test_worker_reconnects_mid_compute_multiple_states_on_scheduler() done, defined at /home/runner/work/distributed/distributed/distributed/tests/test_worker.py:2525>>
timeout = 30
async def wait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
This function is a coroutine.
"""
if loop is None:
loop = events.get_event_loop()
if timeout is None:
return await fut
if timeout <= 0:
fut = ensure_future(fut, loop=loop)
if fut.done():
return fut.result()
fut.cancel()
raise futures.TimeoutError()
waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)
fut = ensure_future(fut, loop=loop)
Exception ignored in: <function Client.__del__ at 0x7f4ffdd6f5f0>
Traceback (most recent call last):
File "/home/runner/work/distributed/distributed/distributed/client.py", line 1185, in __del__
self.close()
File "/home/runner/work/distributed/distributed/distributed/client.py", line 1398, in close
if self.asynchronous:
File "/home/runner/work/distributed/distributed/distributed/client.py", line 808, in asynchronous
return self._asynchronous and self.loop is IOLoop.current()
AttributeError: 'Client' object has no attribute '_asynchronous'
fut.add_done_callback(cb)
try:
# wait until the future completes or the timeout
try:
await waiter
except futures.CancelledError:
fut.remove_done_callback(cb)
fut.cancel()
raise
if fut.done():
return fut.result()
else:
fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
> raise futures.TimeoutError()
E concurrent.futures._base.TimeoutError
/usr/share/miniconda3/envs/dask-distributed/lib/python3.7/asyncio/tasks.py:449: TimeoutErrorMetadata
Metadata
Assignees
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.