Skip to content

Commit

Permalink
Update precommit (#8852)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Sep 2, 2024
1 parent 4b65be0 commit 2e61816
Show file tree
Hide file tree
Showing 62 changed files with 374 additions and 306 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ ignore =
B028
# do not compare types, for exact checks use `is` / `is not`, for instance checks use `isinstance()`
E721
# multiple statements on one line; required for black compat
E701, E704

per-file-ignores =
**/tests/*:
Expand Down
12 changes: 6 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,37 @@ repos:
- id: isort
language_version: python3
- repo: https://github.com/asottile/pyupgrade
rev: v3.15.0
rev: v3.17.0
hooks:
- id: pyupgrade
args:
- --py39-plus
- repo: https://github.com/psf/black
rev: 23.12.1
rev: 24.8.0
hooks:
- id: black
language_version: python3
args:
- --target-version=py39
- repo: https://github.com/pycqa/flake8
rev: 7.0.0
rev: 7.1.1
hooks:
- id: flake8
language_version: python3
additional_dependencies:
# NOTE: autoupdate does not pick up flake8-bugbear since it is a transitive
# dependency. Make sure to update flake8-bugbear manually on a regular basis.
- flake8-bugbear==23.12.2
- flake8-bugbear==24.8.19
- repo: https://github.com/codespell-project/codespell
rev: v2.2.6
rev: v2.3.0
hooks:
- id: codespell
additional_dependencies:
- tomli
types_or: [rst, markdown]
files: docs
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
rev: v1.11.2
hooks:
- id: mypy
# Override default --ignore-missing-imports
Expand Down
3 changes: 1 addition & 2 deletions continuous_integration/scripts/host_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ def main() -> None:
else:
print("CPU frequency:")
for freq in freqs:
# FIXME types-psutil
print(f" - current={freq.current}, min={freq.min}, max={freq.max}") # type: ignore
print(f" - current={freq.current}, min={freq.min}, max={freq.max}")

mem = psutil.virtual_memory()
print("Memory:")
Expand Down
1 change: 1 addition & 0 deletions continuous_integration/scripts/parse_stdout.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""On Windows, pytest-timeout kills off the whole test suite, leaving no junit report
behind. Parse the stdout of pytest to generate one.
"""

from __future__ import annotations

import html
Expand Down
6 changes: 3 additions & 3 deletions distributed/_concurrent_futures_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
# workers to exit when their work queues are empty and then waits until the
# threads finish.

_threads_queues: weakref.WeakKeyDictionary[
threading.Thread, queue.Queue
] = weakref.WeakKeyDictionary()
_threads_queues: weakref.WeakKeyDictionary[threading.Thread, queue.Queue] = (
weakref.WeakKeyDictionary()
)
_shutdown = False


Expand Down
13 changes: 7 additions & 6 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
See also :mod:`distributed.worker_memory` and :mod:`distributed.spill`, which implement
spill/pause/terminate mechanics on the Worker side.
"""

from __future__ import annotations

import abc
Expand Down Expand Up @@ -392,12 +393,12 @@ def _enact_suggestions(self) -> None:
logger.debug("Enacting suggestions for %d tasks:", len(self.pending))

validate = self.scheduler.validate
drop_by_worker: (
defaultdict[scheduler_module.WorkerState, list[Key]]
) = defaultdict(list)
repl_by_worker: (
defaultdict[scheduler_module.WorkerState, list[Key]]
) = defaultdict(list)
drop_by_worker: defaultdict[scheduler_module.WorkerState, list[Key]] = (
defaultdict(list)
)
repl_by_worker: defaultdict[scheduler_module.WorkerState, list[Key]] = (
defaultdict(list)
)

for ts, (pending_repl, pending_drop) in self.pending.items():
if not ts.who_has:
Expand Down
6 changes: 2 additions & 4 deletions distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,10 @@ class BaseActorFuture(abc.ABC, Awaitable[_T]):
"""

@abc.abstractmethod
def result(self, timeout: str | timedelta | float | None = None) -> _T:
...
def result(self, timeout: str | timedelta | float | None = None) -> _T: ...

@abc.abstractmethod
def done(self) -> bool:
...
def done(self) -> bool: ...

def __repr__(self) -> Literal["<ActorFuture>"]:
return "<ActorFuture>"
Expand Down
6 changes: 2 additions & 4 deletions distributed/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,12 @@ def _send_to_subscribers(self, topic: str, event: Any) -> None:
self._scheduler.send_all(client_msgs, worker_msgs={})

@overload
def get_events(self, topic: str) -> tuple[tuple[float, Any], ...]:
...
def get_events(self, topic: str) -> tuple[tuple[float, Any], ...]: ...

@overload
def get_events(
self, topic: None = None
) -> dict[str, tuple[tuple[float, Any], ...]]:
...
) -> dict[str, tuple[tuple[float, Any], ...]]: ...

def get_events(
self, topic: str | None = None
Expand Down
2 changes: 1 addition & 1 deletion distributed/cfexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _cascade_future(future, cf_future):
try:
typ, exc, tb = result
raise exc.with_traceback(tb)
except BaseException as exc:
except BaseException as exc: # noqa: B036
cf_future.set_exception(exc)


Expand Down
8 changes: 5 additions & 3 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,11 @@ async def run():
host=host,
dashboard=dashboard,
dashboard_address=dashboard_address,
name=name
if n_workers == 1 or name is None or name == ""
else str(name) + "-" + str(i),
name=(
name
if n_workers == 1 or name is None or name == ""
else str(name) + "-" + str(i)
),
**kwargs,
**port_kwargs_i,
)
Expand Down
5 changes: 3 additions & 2 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,9 @@ async def test_set_lifetime_stagger_via_env_var(c, s):
env = os.environ.copy()
env["DASK_DISTRIBUTED__WORKER__LIFETIME__DURATION"] = "10 seconds"
env["DASK_DISTRIBUTED__WORKER__LIFETIME__STAGGER"] = "2 seconds"
with popen(["dask", "worker", s.address], env=env), popen(
["dask", "worker", s.address], env=env
with (
popen(["dask", "worker", s.address], env=env),
popen(["dask", "worker", s.address], env=env),
):
await c.wait_for_workers(2)
[lifetime1, lifetime2] = (
Expand Down
21 changes: 11 additions & 10 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@

logger = logging.getLogger(__name__)

_global_clients: weakref.WeakValueDictionary[
int, Client
] = weakref.WeakValueDictionary()
_global_clients: weakref.WeakValueDictionary[int, Client] = (
weakref.WeakValueDictionary()
)
_global_client_index = [0]

_current_client: ContextVar[Client | None] = ContextVar("_current_client", default=None)
Expand Down Expand Up @@ -483,6 +483,7 @@ def execute_callback(fut):
fn(fut)
except BaseException:
logger.exception("Error in callback %s of %s:", fn, fut)
raise

self.client.loop.add_callback(
done_callback, self, partial(cls._cb_executor.submit, execute_callback)
Expand Down Expand Up @@ -3873,13 +3874,13 @@ async def _restart_workers(
name_to_addr = {meta["name"]: addr for addr, meta in info["workers"].items()}
worker_addrs = [name_to_addr.get(w, w) for w in workers]

out: dict[
str, Literal["OK", "removed", "timed out"]
] = await self.scheduler.restart_workers(
workers=worker_addrs,
timeout=timeout,
on_error="raise" if raise_for_error else "return",
stimulus_id=f"client-restart-workers-{time()}",
out: dict[str, Literal["OK", "removed", "timed out"]] = (
await self.scheduler.restart_workers(
workers=worker_addrs,
timeout=timeout,
on_error="raise" if raise_for_error else "return",
stimulus_id=f"client-restart-workers-{time()}",
)
)
# Map keys back to original `workers` input names/addresses
out = {w: out[w_addr] for w, w_addr in zip(workers, worker_addrs)}
Expand Down
3 changes: 1 addition & 2 deletions distributed/comm/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@


class _EntryPoints(Protocol):
def __call__(self, **kwargs: str) -> Iterable[importlib.metadata.EntryPoint]:
...
def __call__(self, **kwargs: str) -> Iterable[importlib.metadata.EntryPoint]: ...


_entry_points: _EntryPoints = importlib.metadata.entry_points # type: ignore[assignment]
Expand Down
5 changes: 3 additions & 2 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
.. _UCX: https://github.com/openucx/ucx
"""

from __future__ import annotations

import functools
Expand Down Expand Up @@ -360,7 +361,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
await self.ep.recv(header)
header = struct.unpack(header_fmt, header)
cuda_frames, sizes = header[:nframes], header[nframes:]
except BaseException as e:
except BaseException as e: # noqa: B036
# In addition to UCX exceptions, may be CancelledError or another
# "low-level" exception. The only safe thing to do is to abort.
# (See also https://github.com/dask/distributed/pull/6574).
Expand Down Expand Up @@ -390,7 +391,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
try:
for each_frame in recv_frames:
await self.ep.recv(each_frame)
except BaseException as e:
except BaseException as e: # noqa: B036
# In addition to UCX exceptions, may be CancelledError or another
# "low-level" exception. The only safe thing to do is to abort.
# (See also https://github.com/dask/distributed/pull/6574).
Expand Down
8 changes: 5 additions & 3 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3923,9 +3923,11 @@ def update(self):

# Format event loop as time and GIL (if configured) as %
self.data["text"] = [
f"{x * 100:.1f}%"
if i % 2 and s.monitor.monitor_gil_contention
else format_time(x)
(
f"{x * 100:.1f}%"
if i % 2 and s.monitor.monitor_gil_contention
else format_time(x)
)
for i, x in enumerate(self.data["values"])
]
update(self.source, self.data)
Expand Down
30 changes: 18 additions & 12 deletions distributed/deploy/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,28 +434,34 @@ def SSHCluster(
"cls": Scheduler,
"options": {
"address": hosts[0],
"connect_options": connect_options
if isinstance(connect_options, dict)
else connect_options[0],
"connect_options": (
connect_options
if isinstance(connect_options, dict)
else connect_options[0]
),
"kwargs": scheduler_options,
"remote_python": remote_python[0]
if isinstance(remote_python, list)
else remote_python,
"remote_python": (
remote_python[0] if isinstance(remote_python, list) else remote_python
),
},
}
workers = {
i: {
"cls": Worker,
"options": {
"address": host,
"connect_options": connect_options
if isinstance(connect_options, dict)
else connect_options[i + 1],
"connect_options": (
connect_options
if isinstance(connect_options, dict)
else connect_options[i + 1]
),
"kwargs": worker_options,
"worker_class": worker_class,
"remote_python": remote_python[i + 1]
if isinstance(remote_python, list)
else remote_python,
"remote_python": (
remote_python[i + 1]
if isinstance(remote_python, list)
else remote_python
),
},
}
for i, host in enumerate(hosts[1:])
Expand Down
36 changes: 21 additions & 15 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,17 @@ async def test_adapt_quickly():
Instead we want to wait a few beats before removing a worker in case the
user is taking a brief pause between work
"""
async with LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
threads_per_worker=1,
) as cluster, Client(cluster, asynchronous=True) as client:
async with (
LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
threads_per_worker=1,
) as cluster,
Client(cluster, asynchronous=True) as client,
):
adapt = cluster.adapt(interval="20 ms", wait_count=5, maximum=10)
future = client.submit(slowinc, 1, delay=0.100)
await wait(future)
Expand Down Expand Up @@ -240,13 +243,16 @@ async def test_adapt_quickly():
@gen_test()
async def test_adapt_down():
"""Ensure that redefining adapt with a lower maximum removes workers"""
async with LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster, Client(cluster, asynchronous=True) as client:
async with (
LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster,
Client(cluster, asynchronous=True) as client,
):
cluster.adapt(interval="20ms", maximum=5)

futures = client.map(slowinc, range(1000), delay=0.1)
Expand Down
19 changes: 12 additions & 7 deletions distributed/deploy/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

@gen_test()
async def test_eq():
async with Cluster(asynchronous=True, name="A") as clusterA, Cluster(
asynchronous=True, name="A2"
) as clusterA2, Cluster(asynchronous=True, name="B") as clusterB:
async with (
Cluster(asynchronous=True, name="A") as clusterA,
Cluster(asynchronous=True, name="A2") as clusterA2,
Cluster(asynchronous=True, name="B") as clusterB,
):
assert clusterA != "A"
assert not (clusterA == "A")
assert clusterA == clusterA
Expand Down Expand Up @@ -75,8 +77,11 @@ def test_exponential_backoff():
@gen_test()
async def test_sync_context_manager_used_with_async_cluster():
async with Cluster(asynchronous=True, name="A") as cluster:
with pytest.raises(
TypeError,
match=r"Used 'with' with asynchronous class; please use 'async with'",
), cluster:
with (
pytest.raises(
TypeError,
match=r"Used 'with' with asynchronous class; please use 'async with'",
),
cluster,
):
pass
Loading

0 comments on commit 2e61816

Please sign in to comment.