Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Python 3.11 #7249

Merged
merged 6 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.8", "3.9", "3.10", "3.11"]
graingert marked this conversation as resolved.
Show resolved Hide resolved
queuing: [queue]
# Cherry-pick test modules to split the overall runtime roughly in half
partition: [ci1, not ci1]
exclude:
- os: macos-latest
python-version: 3.9
- os: macos-latest
python-version: 3.10
include:
- os: ubuntu-latest
python-version: 3.9
Expand Down
50 changes: 50 additions & 0 deletions continuous_integration/environment-3.11.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
name: dask-distributed
channels:
- conda-forge
- defaults
dependencies:
- python=3.11
- packaging
- pip
- asyncssh
# Temporary restriction until https://github.com/dask/distributed/issues/7173 is resolved
- bokeh<3
- click
- cloudpickle
- coverage
- dask # overridden by git tip below
- filesystem-spec # overridden by git tip below
- h5py
- ipykernel
- ipywidgets
- jinja2
- locket >=1.0
- msgpack-python
- netcdf4
- paramiko
- pre-commit
- prometheus_client
- psutil
- pyarrow=7
- pytest
- pytest-cov
- pytest-faulthandler
- pytest-repeat
- pytest-rerunfailures
- pytest-timeout
- requests
- s3fs # overridden by git tip below
- scikit-learn
- scipy
- sortedcollections
- tblib
- toolz
- tornado >=6.2
- zict # overridden by git tip below
- zstandard >=0.9.0
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask/s3fs
- git+https://github.com/dask/zict
- git+https://github.com/fsspec/filesystem_spec
- keras
2 changes: 1 addition & 1 deletion distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ def test_signal_handling(loop, sig):
port = open_port()
with subprocess.Popen(
[
"python",
sys.executable,
"-m",
"distributed.cli.dask_scheduler",
f"--port={port}",
Expand Down
6 changes: 3 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2910,7 +2910,7 @@ def run(
)

@staticmethod
def _get_computation_code(stacklevel=None) -> str:
def _get_computation_code(stacklevel: int | None = None) -> str:
"""Walk up the stack to the user code and extract the code surrounding
the compute/submit/persist call. All modules encountered which are
ignored through the option
Expand All @@ -2921,7 +2921,6 @@ def _get_computation_code(stacklevel=None) -> str:
``stacklevel`` may be used to explicitly indicate from which frame on
the stack to get the source code.
"""

ignore_modules = dask.config.get(
"distributed.diagnostics.computations.ignore-modules"
)
Expand All @@ -2939,7 +2938,8 @@ def _get_computation_code(stacklevel=None) -> str:
else:
# stacklevel 0 or less - shows dask internals which likely isn't helpful
stacklevel = stacklevel if stacklevel > 0 else 1
for i, (fr, _) in enumerate(traceback.walk_stack(None), 1):

for i, (fr, _) in enumerate(traceback.walk_stack(sys._getframe().f_back), 1):
if stacklevel is not None:
if i != stacklevel:
continue
Expand Down
30 changes: 24 additions & 6 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import logging
import math
import weakref
from collections.abc import Awaitable, Generator
from contextlib import suppress
from inspect import isawaitable
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar

from tornado import gen
from tornado.ioloop import IOLoop
Expand Down Expand Up @@ -108,6 +109,16 @@ async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()


_T = TypeVar("_T")


async def _wrap_awaitable(aw: Awaitable[_T]) -> _T:
return await aw


_T_spec_cluster = TypeVar("_T_spec_cluster", bound="SpecCluster")


class SpecCluster(Cluster):
"""Cluster that requires a full specification of workers

Expand Down Expand Up @@ -327,7 +338,7 @@ def _correct_state(self):
self._correct_state_waiting = task
return task

async def _correct_state_internal(self):
async def _correct_state_internal(self) -> None:
async with self._lock:
self._correct_state_waiting = None

Expand Down Expand Up @@ -363,7 +374,9 @@ async def _correct_state_internal(self):
self._created.add(worker)
workers.append(worker)
if workers:
await asyncio.wait(workers)
await asyncio.wait(
[asyncio.create_task(_wrap_awaitable(w)) for w in workers]
)
for w in workers:
w._cluster = weakref.ref(self)
await w # for tornado gen.coroutine support
Expand Down Expand Up @@ -392,14 +405,19 @@ def f():
asyncio.get_running_loop().call_later(delay, f)
super()._update_worker_status(op, msg)

def __await__(self):
async def _():
def __await__(self: _T_spec_cluster) -> Generator[Any, Any, _T_spec_cluster]:
async def _() -> _T_spec_cluster:
if self.status == Status.created:
await self._start()
await self.scheduler
await self._correct_state()
if self.workers:
await asyncio.wait(list(self.workers.values())) # maybe there are more
await asyncio.wait(
[
asyncio.create_task(_wrap_awaitable(w))
for w in self.workers.values()
]
) # maybe there are more
return self

return _().__await__()
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2625,7 +2625,7 @@ async def test_task_unique_groups(c, s, a, b):
x = c.submit(sum, [1, 2])
y = c.submit(len, [1, 2])
z = c.submit(sum, [3, 4])
await asyncio.wait([x, y, z])
await asyncio.gather(x, y, z)

assert s.task_prefixes["len"].states["memory"] == 1
assert s.task_prefixes["sum"].states["memory"] == 2
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2772,7 +2772,7 @@ async def test_forget_dependents_after_release(c, s, a):
fut = c.submit(inc, 1, key="f-1")
fut2 = c.submit(inc, fut, key="f-2")

await asyncio.wait([fut, fut2])
await asyncio.gather(fut, fut2)

assert fut.key in a.state.tasks
assert fut2.key in a.state.tasks
Expand Down
19 changes: 14 additions & 5 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from tornado.ioloop import IOLoop

import dask
from dask.sizeof import sizeof

from distributed import Event, Scheduler, system
from distributed import versions as version_module
Expand Down Expand Up @@ -2517,6 +2516,17 @@ async def fetch_metrics_sample_names(port: int, prefix: str | None = None) -> se
return sample_names


def _get_gc_overhead():
class _CustomObject:
def __sizeof__(self):
return 0

return sys.getsizeof(_CustomObject())


_size_obj = _get_gc_overhead()


class SizeOf:
"""
An object that returns exactly nbytes when inspected by dask.sizeof.sizeof
Expand All @@ -2525,12 +2535,11 @@ class SizeOf:
def __init__(self, nbytes: int) -> None:
if not isinstance(nbytes, int):
raise TypeError(f"Expected integer for nbytes but got {type(nbytes)}")
size_obj = sizeof(object())
if nbytes < size_obj:
if nbytes < _size_obj:
raise ValueError(
f"Expected a value larger than {size_obj} integer but got {nbytes}."
f"Expected a value larger than {_size_obj} integer but got {nbytes}."
)
self._nbytes = nbytes - size_obj
self._nbytes = nbytes - _size_obj

def __sizeof__(self) -> int:
return self._nbytes
Expand Down