Skip to content

async task classes are not fully implemented for Python 3.11 #6785

Closed
@QuLogic

Description

@QuLogic

What happened:

It appears that in Python 3.11, more parts of async task-like classes must be implemented, but they are not in distributed. This seems to affect at least Nanny, Worker, ProcessInterface, MultiWorker, and Future.

Nanny failure
_______________ test_client_constructor_with_temporary_security ________________

    @gen_test()
    async def test_client_constructor_with_temporary_security():
        xfail_ssl_issue5601()
        pytest.importorskip("cryptography")
>       async with Client(
            security=True, silence_logs=False, dashboard_address=":0", asynchronous=True
        ) as c:

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/tests/test_local.py:318: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/client.py:1347: in __aenter__
    await self
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/client.py:1164: in _start
    self.cluster = await LocalCluster(
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:389: in _
    await self._correct_state()
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:355: in _correct_state_internal
    await asyncio.wait(workers)
/usr/lib64/python3.11/asyncio/tasks.py:427: in wait
    return await _wait(fs, timeout, return_when, loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fs = {<Nanny: None, threads: 1>, <Nanny: None, threads: 1>, <Nanny: None, threads: 1>, <Nanny: None, threads: 1>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=False closed=True debug=False>

    async def _wait(fs, timeout, return_when, loop):
        """Internal helper for wait().
    
        The fs argument must be a collection of Futures.
        """
        assert fs, 'Set of Futures is empty.'
        waiter = loop.create_future()
        timeout_handle = None
        if timeout is not None:
            timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        counter = len(fs)
    
        def _on_completion(f):
            nonlocal counter
            counter -= 1
            if (counter <= 0 or
                return_when == FIRST_COMPLETED or
                return_when == FIRST_EXCEPTION and (not f.cancelled() and
                                                    f.exception() is not None)):
                if timeout_handle is not None:
                    timeout_handle.cancel()
                if not waiter.done():
                    waiter.set_result(None)
    
        for f in fs:
>           f.add_done_callback(_on_completion)
E           AttributeError: 'Nanny' object has no attribute 'add_done_callback'

/usr/lib64/python3.11/asyncio/tasks.py:531: AttributeError
Worker failure
__________________________________ test_procs __________________________________

self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)
workers = {0: {'cls': <class 'distributed.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': No...ted.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': None, 'interface': None, ...}}}
scheduler = {'cls': <class 'distributed.scheduler.Scheduler'>, 'options': {'blocked_handlers': None, 'dashboard': True, 'dashboard_address': ':0', 'host': None, ...}}
worker = {'cls': <class 'distributed.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': None, 'interface': None, ...}}
asynchronous = False
loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef5d2350>
security = Security(require_encryption=False, tls_min_version=771)
silence_logs = False, name = None, shutdown_on_close = True
scheduler_sync_interval = 1

    def __init__(
        self,
        workers=None,
        scheduler=None,
        worker=None,
        asynchronous=False,
        loop=None,
        security=None,
        silence_logs=False,
        name=None,
        shutdown_on_close=True,
        scheduler_sync_interval=1,
    ):
        self._created = weakref.WeakSet()
    
        self.scheduler_spec = copy.copy(scheduler)
        self.worker_spec = copy.copy(workers) or {}
        self.new_spec = copy.copy(worker)
        self.scheduler = None
        self.workers = {}
        self._i = 0
        self.security = security or Security()
        self._futures = set()
    
        if silence_logs:
            self._old_logging_level = silence_logging(level=silence_logs)
            self._old_bokeh_logging_level = silence_logging(
                level=silence_logs, root="bokeh"
            )
    
        self._instances.add(self)
        self._correct_state_waiting = None
        self._name = name or type(self).__name__
        self.shutdown_on_close = shutdown_on_close
    
        super().__init__(
            asynchronous=asynchronous,
            loop=loop,
            name=name,
            scheduler_sync_interval=scheduler_sync_interval,
        )
    
        if not self.asynchronous:
            self._loop_runner.start()
            self.sync(self._start)
            try:
>               self.sync(self._correct_state)

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:266: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)
func = <bound method SpecCluster._correct_state of LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)>
asynchronous = False, callback_timeout = None, args = (), kwargs = {}

    def sync(self, func, *args, asynchronous=None, callback_timeout=None, **kwargs):
        """Call `func` with `args` synchronously or asynchronously depending on
        the calling context"""
        callback_timeout = _parse_timedelta(callback_timeout)
        if asynchronous is None:
            asynchronous = self.asynchronous
        if asynchronous:
            future = func(*args, **kwargs)
            if callback_timeout is not None:
                future = asyncio.wait_for(future, callback_timeout)
            return future
        else:
>           return sync(
                self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
            )

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:338: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef5d2350>
func = <bound method SpecCluster._correct_state of LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)>
callback_timeout = None, args = (), kwargs = {}
f = <function sync.<locals>.f at 0x7fb8f0953ec0>
wait = <function sync.<locals>.wait at 0x7fb8f09520c0>
typ = <class 'AttributeError'>
exc = AttributeError("'Worker' object has no attribute 'add_done_callback'")
tb = <traceback object at 0x7fb8fa548700>

    def sync(loop, func, *args, callback_timeout=None, **kwargs):
        """
        Run coroutine in loop running in separate thread.
        """
        callback_timeout = _parse_timedelta(callback_timeout, "s")
        if loop.asyncio_loop.is_closed():
            raise RuntimeError("IOLoop is closed")
    
        e = threading.Event()
        main_tid = threading.get_ident()
        result = error = future = None  # set up non-locals
    
        @gen.coroutine
        def f():
            nonlocal result, error, future
            try:
                if main_tid == threading.get_ident():
                    raise RuntimeError("sync() called from thread of running loop")
                yield gen.moment
                future = func(*args, **kwargs)
                if callback_timeout is not None:
                    future = asyncio.wait_for(future, callback_timeout)
                future = asyncio.ensure_future(future)
                result = yield future
            except Exception:
                error = sys.exc_info()
            finally:
                e.set()
    
        def cancel():
            if future is not None:
                future.cancel()
    
        def wait(timeout):
            try:
                return e.wait(timeout)
            except KeyboardInterrupt:
                loop.add_callback(cancel)
                raise
    
        loop.add_callback(f)
        if callback_timeout is not None:
            if not wait(callback_timeout):
                raise TimeoutError(f"timed out after {callback_timeout} s.")
        else:
            while not e.is_set():
                wait(10)
    
        if error:
            typ, exc, tb = error
>           raise exc.with_traceback(tb)

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @gen.coroutine
    def f():
        nonlocal result, error, future
        try:
            if main_tid == threading.get_ident():
                raise RuntimeError("sync() called from thread of running loop")
            yield gen.moment
            future = func(*args, **kwargs)
            if callback_timeout is not None:
                future = asyncio.wait_for(future, callback_timeout)
            future = asyncio.ensure_future(future)
>           result = yield future

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:378: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tornado.gen.Runner object at 0x7fb8fa549b10>

    def run(self) -> None:
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready.
        """
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if future is None:
                    raise Exception("No pending future")
                if not future.done():
                    return
                self.future = None
                try:
                    exc_info = None
    
                    try:
>                       value = future.result()

/usr/lib64/python3.11/site-packages/tornado/gen.py:769: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)

    async def _correct_state_internal(self):
        async with self._lock:
            self._correct_state_waiting = None
    
            to_close = set(self.workers) - set(self.worker_spec)
            if to_close:
                if self.scheduler.status == Status.running:
                    await self.scheduler_comm.retire_workers(workers=list(to_close))
                tasks = [
                    asyncio.create_task(self.workers[w].close())
                    for w in to_close
                    if w in self.workers
                ]
                await asyncio.gather(*tasks)
            for name in to_close:
                if name in self.workers:
                    del self.workers[name]
    
            to_open = set(self.worker_spec) - set(self.workers)
            workers = []
            for name in to_open:
                d = self.worker_spec[name]
                cls, opts = d["cls"], d.get("options", {})
                if "name" not in opts:
                    opts = opts.copy()
                    opts["name"] = name
                if isinstance(cls, str):
                    cls = import_term(cls)
                worker = cls(
                    getattr(self.scheduler, "contact_address", None)
                    or self.scheduler.address,
                    **opts,
                )
                self._created.add(worker)
                workers.append(worker)
            if workers:
>               await asyncio.wait(workers)

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:355: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fs = {<Worker 'not-running', name: 1, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>, <Worker 'not-running', name: 0, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>}

    async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
        """Wait for the Futures or Tasks given by fs to complete.
    
        The fs iterable must not be empty.
    
        Coroutines will be wrapped in Tasks.
    
        Returns two sets of Future: (done, pending).
    
        Usage:
    
            done, pending = await asyncio.wait(fs)
    
        Note: This does not raise TimeoutError! Futures that aren't done
        when the timeout occurs are returned in the second set.
        """
        if futures.isfuture(fs) or coroutines.iscoroutine(fs):
            raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
        if not fs:
            raise ValueError('Set of Tasks/Futures is empty.')
        if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
            raise ValueError(f'Invalid return_when value: {return_when}')
    
        fs = set(fs)
    
        if any(coroutines.iscoroutine(f) for f in fs):
            raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
    
        loop = events.get_running_loop()
>       return await _wait(fs, timeout, return_when, loop)

/usr/lib64/python3.11/asyncio/tasks.py:427: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fs = {<Worker 'not-running', name: 1, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>, <Worker 'not-running', name: 0, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=True closed=False debug=False>

    async def _wait(fs, timeout, return_when, loop):
        """Internal helper for wait().
    
        The fs argument must be a collection of Futures.
        """
        assert fs, 'Set of Futures is empty.'
        waiter = loop.create_future()
        timeout_handle = None
        if timeout is not None:
            timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        counter = len(fs)
    
        def _on_completion(f):
            nonlocal counter
            counter -= 1
            if (counter <= 0 or
                return_when == FIRST_COMPLETED or
                return_when == FIRST_EXCEPTION and (not f.cancelled() and
                                                    f.exception() is not None)):
                if timeout_handle is not None:
                    timeout_handle.cancel()
                if not waiter.done():
                    waiter.set_result(None)
    
        for f in fs:
>           f.add_done_callback(_on_completion)
E           AttributeError: 'Worker' object has no attribute 'add_done_callback'

/usr/lib64/python3.11/asyncio/tasks.py:531: AttributeError
ProcessInterface failure
__________________________ test_ProcessInterfaceValid __________________________

self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8f036b650>
callback = functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0...ibuted/deploy/spec.py:319> exception=AttributeError("'ProcessInterface' object has no attribute 'add_done_callback'")>)

    def _run_callback(self, callback: Callable[[], Any]) -> None:
        """Runs a callback with error handling.
    
        .. versionchanged:: 6.0
    
           CancelledErrors are no longer logged.
        """
        try:
>           ret = callback()

/usr/lib64/python3.11/site-packages/tornado/ioloop.py:740: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8f036b650>
future = <Task finished name='Task-143678' coro=<SpecCluster._correct_state_internal() done, defined at /builddir/build/BUILDRO...ributed/deploy/spec.py:319> exception=AttributeError("'ProcessInterface' object has no attribute 'add_done_callback'")>

    def _discard_future_result(self, future: Future) -> None:
        """Avoid unhandled-exception warnings from spawned coroutines."""
>       future.result()
E       AttributeError: 'ProcessInterface' object has no attribute 'add_done_callback'

/usr/lib64/python3.11/site-packages/tornado/ioloop.py:764: AttributeError
MultiWorker failure
_______________________________ test_MultiWorker _______________________________

self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef7ead50>
callback = functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0...distributed/deploy/spec.py:319> exception=AttributeError("'MultiWorker' object has no attribute 'add_done_callback'")>)

    def _run_callback(self, callback: Callable[[], Any]) -> None:
        """Runs a callback with error handling.
    
        .. versionchanged:: 6.0
    
           CancelledErrors are no longer logged.
        """
        try:
>           ret = callback()

/usr/lib64/python3.11/site-packages/tornado/ioloop.py:740: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef7ead50>
future = <Task finished name='Task-143730' coro=<SpecCluster._correct_state_internal() done, defined at /builddir/build/BUILDRO.../distributed/deploy/spec.py:319> exception=AttributeError("'MultiWorker' object has no attribute 'add_done_callback'")>

    def _discard_future_result(self, future: Future) -> None:
        """Avoid unhandled-exception warnings from spawned coroutines."""
>       future.result()
E       AttributeError: 'MultiWorker' object has no attribute 'add_done_callback'
Future failure
___________________________ test_task_unique_groups ____________________________

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45313', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:34269', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:37035', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_task_unique_groups(c, s, a, b):
        """This test ensure that task groups remain unique when using submit"""
        x = c.submit(sum, [1, 2])
        y = c.submit(len, [1, 2])
        z = c.submit(sum, [3, 4])
>       await asyncio.wait([x, y, z])

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/tests/test_scheduler.py:2206: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/lib64/python3.11/asyncio/tasks.py:427: in wait
    return await _wait(fs, timeout, return_when, loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fs = {<Future: cancelled, type: int, key: len-a6d0feba889a132085fa70f1b616feeb>, <Future: cancelled, type: int, key: sum-1d10c8fc1de9b59dc44708aaf25351e0>, <Future: cancelled, type: int, key: sum-fcf9017adfd73674bb128a7ddc1ad246>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=False closed=True debug=False>

    async def _wait(fs, timeout, return_when, loop):
        """Internal helper for wait().
    
        The fs argument must be a collection of Futures.
        """
        assert fs, 'Set of Futures is empty.'
        waiter = loop.create_future()
        timeout_handle = None
        if timeout is not None:
            timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        counter = len(fs)
    
        def _on_completion(f):
            nonlocal counter
            counter -= 1
            if (counter <= 0 or
                return_when == FIRST_COMPLETED or
                return_when == FIRST_EXCEPTION and (not f.cancelled() and
                                                    f.exception() is not None)):
                if timeout_handle is not None:
                    timeout_handle.cancel()
                if not waiter.done():
                    waiter.set_result(None)
    
        for f in fs:
            f.add_done_callback(_on_completion)
    
        try:
            await waiter
        finally:
            if timeout_handle is not None:
                timeout_handle.cancel()
            for f in fs:
>               f.remove_done_callback(_on_completion)
E               AttributeError: 'Future' object has no attribute 'remove_done_callback'

/usr/lib64/python3.11/asyncio/tasks.py:539: AttributeError

What you expected to happen:
Tests all pass.

Environment:

  • Dask version: 2022.7.1
  • Python version: 3.11.0.b4
  • Operating System: Fedora Rawhide
  • Install method (conda, pip, source): source

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions