Closed
Description
What happened:
It appears that in Python 3.11, more parts of async task-like classes must be implemented, but they are not in distributed
. This seems to affect at least Nanny
, Worker
, ProcessInterface
, MultiWorker
, and Future
.
Nanny failure
_______________ test_client_constructor_with_temporary_security ________________
@gen_test()
async def test_client_constructor_with_temporary_security():
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
> async with Client(
security=True, silence_logs=False, dashboard_address=":0", asynchronous=True
) as c:
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/tests/test_local.py:318:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/client.py:1347: in __aenter__
await self
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/client.py:1164: in _start
self.cluster = await LocalCluster(
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:389: in _
await self._correct_state()
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:355: in _correct_state_internal
await asyncio.wait(workers)
/usr/lib64/python3.11/asyncio/tasks.py:427: in wait
return await _wait(fs, timeout, return_when, loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fs = {<Nanny: None, threads: 1>, <Nanny: None, threads: 1>, <Nanny: None, threads: 1>, <Nanny: None, threads: 1>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=False closed=True debug=False>
async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait().
The fs argument must be a collection of Futures.
"""
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
for f in fs:
> f.add_done_callback(_on_completion)
E AttributeError: 'Nanny' object has no attribute 'add_done_callback'
/usr/lib64/python3.11/asyncio/tasks.py:531: AttributeError
Worker failure
__________________________________ test_procs __________________________________
self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)
workers = {0: {'cls': <class 'distributed.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': No...ted.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': None, 'interface': None, ...}}}
scheduler = {'cls': <class 'distributed.scheduler.Scheduler'>, 'options': {'blocked_handlers': None, 'dashboard': True, 'dashboard_address': ':0', 'host': None, ...}}
worker = {'cls': <class 'distributed.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': None, 'interface': None, ...}}
asynchronous = False
loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef5d2350>
security = Security(require_encryption=False, tls_min_version=771)
silence_logs = False, name = None, shutdown_on_close = True
scheduler_sync_interval = 1
def __init__(
self,
workers=None,
scheduler=None,
worker=None,
asynchronous=False,
loop=None,
security=None,
silence_logs=False,
name=None,
shutdown_on_close=True,
scheduler_sync_interval=1,
):
self._created = weakref.WeakSet()
self.scheduler_spec = copy.copy(scheduler)
self.worker_spec = copy.copy(workers) or {}
self.new_spec = copy.copy(worker)
self.scheduler = None
self.workers = {}
self._i = 0
self.security = security or Security()
self._futures = set()
if silence_logs:
self._old_logging_level = silence_logging(level=silence_logs)
self._old_bokeh_logging_level = silence_logging(
level=silence_logs, root="bokeh"
)
self._instances.add(self)
self._correct_state_waiting = None
self._name = name or type(self).__name__
self.shutdown_on_close = shutdown_on_close
super().__init__(
asynchronous=asynchronous,
loop=loop,
name=name,
scheduler_sync_interval=scheduler_sync_interval,
)
if not self.asynchronous:
self._loop_runner.start()
self.sync(self._start)
try:
> self.sync(self._correct_state)
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:266:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)
func = <bound method SpecCluster._correct_state of LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)>
asynchronous = False, callback_timeout = None, args = (), kwargs = {}
def sync(self, func, *args, asynchronous=None, callback_timeout=None, **kwargs):
"""Call `func` with `args` synchronously or asynchronously depending on
the calling context"""
callback_timeout = _parse_timedelta(callback_timeout)
if asynchronous is None:
asynchronous = self.asynchronous
if asynchronous:
future = func(*args, **kwargs)
if callback_timeout is not None:
future = asyncio.wait_for(future, callback_timeout)
return future
else:
> return sync(
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
)
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:338:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef5d2350>
func = <bound method SpecCluster._correct_state of LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)>
callback_timeout = None, args = (), kwargs = {}
f = <function sync.<locals>.f at 0x7fb8f0953ec0>
wait = <function sync.<locals>.wait at 0x7fb8f09520c0>
typ = <class 'AttributeError'>
exc = AttributeError("'Worker' object has no attribute 'add_done_callback'")
tb = <traceback object at 0x7fb8fa548700>
def sync(loop, func, *args, callback_timeout=None, **kwargs):
"""
Run coroutine in loop running in separate thread.
"""
callback_timeout = _parse_timedelta(callback_timeout, "s")
if loop.asyncio_loop.is_closed():
raise RuntimeError("IOLoop is closed")
e = threading.Event()
main_tid = threading.get_ident()
result = error = future = None # set up non-locals
@gen.coroutine
def f():
nonlocal result, error, future
try:
if main_tid == threading.get_ident():
raise RuntimeError("sync() called from thread of running loop")
yield gen.moment
future = func(*args, **kwargs)
if callback_timeout is not None:
future = asyncio.wait_for(future, callback_timeout)
future = asyncio.ensure_future(future)
result = yield future
except Exception:
error = sys.exc_info()
finally:
e.set()
def cancel():
if future is not None:
future.cancel()
def wait(timeout):
try:
return e.wait(timeout)
except KeyboardInterrupt:
loop.add_callback(cancel)
raise
loop.add_callback(f)
if callback_timeout is not None:
if not wait(callback_timeout):
raise TimeoutError(f"timed out after {callback_timeout} s.")
else:
while not e.is_set():
wait(10)
if error:
typ, exc, tb = error
> raise exc.with_traceback(tb)
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:405:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
@gen.coroutine
def f():
nonlocal result, error, future
try:
if main_tid == threading.get_ident():
raise RuntimeError("sync() called from thread of running loop")
yield gen.moment
future = func(*args, **kwargs)
if callback_timeout is not None:
future = asyncio.wait_for(future, callback_timeout)
future = asyncio.ensure_future(future)
> result = yield future
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:378:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tornado.gen.Runner object at 0x7fb8fa549b10>
def run(self) -> None:
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if future is None:
raise Exception("No pending future")
if not future.done():
return
self.future = None
try:
exc_info = None
try:
> value = future.result()
/usr/lib64/python3.11/site-packages/tornado/gen.py:769:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)
async def _correct_state_internal(self):
async with self._lock:
self._correct_state_waiting = None
to_close = set(self.workers) - set(self.worker_spec)
if to_close:
if self.scheduler.status == Status.running:
await self.scheduler_comm.retire_workers(workers=list(to_close))
tasks = [
asyncio.create_task(self.workers[w].close())
for w in to_close
if w in self.workers
]
await asyncio.gather(*tasks)
for name in to_close:
if name in self.workers:
del self.workers[name]
to_open = set(self.worker_spec) - set(self.workers)
workers = []
for name in to_open:
d = self.worker_spec[name]
cls, opts = d["cls"], d.get("options", {})
if "name" not in opts:
opts = opts.copy()
opts["name"] = name
if isinstance(cls, str):
cls = import_term(cls)
worker = cls(
getattr(self.scheduler, "contact_address", None)
or self.scheduler.address,
**opts,
)
self._created.add(worker)
workers.append(worker)
if workers:
> await asyncio.wait(workers)
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:355:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fs = {<Worker 'not-running', name: 1, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>, <Worker 'not-running', name: 0, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>}
async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures or Tasks given by fs to complete.
The fs iterable must not be empty.
Coroutines will be wrapped in Tasks.
Returns two sets of Future: (done, pending).
Usage:
done, pending = await asyncio.wait(fs)
Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
if not fs:
raise ValueError('Set of Tasks/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
fs = set(fs)
if any(coroutines.iscoroutine(f) for f in fs):
raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
loop = events.get_running_loop()
> return await _wait(fs, timeout, return_when, loop)
/usr/lib64/python3.11/asyncio/tasks.py:427:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fs = {<Worker 'not-running', name: 1, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>, <Worker 'not-running', name: 0, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=True closed=False debug=False>
async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait().
The fs argument must be a collection of Futures.
"""
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
for f in fs:
> f.add_done_callback(_on_completion)
E AttributeError: 'Worker' object has no attribute 'add_done_callback'
/usr/lib64/python3.11/asyncio/tasks.py:531: AttributeError
ProcessInterface failure
__________________________ test_ProcessInterfaceValid __________________________
self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8f036b650>
callback = functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0...ibuted/deploy/spec.py:319> exception=AttributeError("'ProcessInterface' object has no attribute 'add_done_callback'")>)
def _run_callback(self, callback: Callable[[], Any]) -> None:
"""Runs a callback with error handling.
.. versionchanged:: 6.0
CancelledErrors are no longer logged.
"""
try:
> ret = callback()
/usr/lib64/python3.11/site-packages/tornado/ioloop.py:740:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8f036b650>
future = <Task finished name='Task-143678' coro=<SpecCluster._correct_state_internal() done, defined at /builddir/build/BUILDRO...ributed/deploy/spec.py:319> exception=AttributeError("'ProcessInterface' object has no attribute 'add_done_callback'")>
def _discard_future_result(self, future: Future) -> None:
"""Avoid unhandled-exception warnings from spawned coroutines."""
> future.result()
E AttributeError: 'ProcessInterface' object has no attribute 'add_done_callback'
/usr/lib64/python3.11/site-packages/tornado/ioloop.py:764: AttributeError
MultiWorker failure
_______________________________ test_MultiWorker _______________________________
self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef7ead50>
callback = functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0...distributed/deploy/spec.py:319> exception=AttributeError("'MultiWorker' object has no attribute 'add_done_callback'")>)
def _run_callback(self, callback: Callable[[], Any]) -> None:
"""Runs a callback with error handling.
.. versionchanged:: 6.0
CancelledErrors are no longer logged.
"""
try:
> ret = callback()
/usr/lib64/python3.11/site-packages/tornado/ioloop.py:740:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef7ead50>
future = <Task finished name='Task-143730' coro=<SpecCluster._correct_state_internal() done, defined at /builddir/build/BUILDRO.../distributed/deploy/spec.py:319> exception=AttributeError("'MultiWorker' object has no attribute 'add_done_callback'")>
def _discard_future_result(self, future: Future) -> None:
"""Avoid unhandled-exception warnings from spawned coroutines."""
> future.result()
E AttributeError: 'MultiWorker' object has no attribute 'add_done_callback'
Future failure
___________________________ test_task_unique_groups ____________________________
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45313', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:34269', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:37035', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_task_unique_groups(c, s, a, b):
"""This test ensure that task groups remain unique when using submit"""
x = c.submit(sum, [1, 2])
y = c.submit(len, [1, 2])
z = c.submit(sum, [3, 4])
> await asyncio.wait([x, y, z])
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/tests/test_scheduler.py:2206:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/lib64/python3.11/asyncio/tasks.py:427: in wait
return await _wait(fs, timeout, return_when, loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fs = {<Future: cancelled, type: int, key: len-a6d0feba889a132085fa70f1b616feeb>, <Future: cancelled, type: int, key: sum-1d10c8fc1de9b59dc44708aaf25351e0>, <Future: cancelled, type: int, key: sum-fcf9017adfd73674bb128a7ddc1ad246>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=False closed=True debug=False>
async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait().
The fs argument must be a collection of Futures.
"""
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
for f in fs:
f.add_done_callback(_on_completion)
try:
await waiter
finally:
if timeout_handle is not None:
timeout_handle.cancel()
for f in fs:
> f.remove_done_callback(_on_completion)
E AttributeError: 'Future' object has no attribute 'remove_done_callback'
/usr/lib64/python3.11/asyncio/tasks.py:539: AttributeError
What you expected to happen:
Tests all pass.
Environment:
- Dask version: 2022.7.1
- Python version: 3.11.0.b4
- Operating System: Fedora Rawhide
- Install method (conda, pip, source): source