-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable mypy in CI 1/2 #5328
Enable mypy in CI 1/2 #5328
Changes from 11 commits
47d144b
2b2de7e
a48f68b
fca1f7a
7d2072a
a10c034
a7985cb
c216653
74091d9
4b6ad55
8a0ca70
a2678bd
5802463
ec485f3
bc67e27
4ef6c46
831cdae
fa8d737
86e6cbf
9d82ae6
1fa20a2
1853f60
8e9c69f
edcd206
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,4 +24,4 @@ repos: | |
hooks: | ||
- id: pyupgrade | ||
args: | ||
- "--py37-plus" | ||
- --py37-plus |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,7 +50,7 @@ class NotThisMethod(Exception): | |
"""Exception raised if a method is not valid for the current scenario.""" | ||
|
||
|
||
LONG_VERSION_PY = {} | ||
LONG_VERSION_PY: dict = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this file is missing a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's only necessary if you use bracket-stile |
||
HANDLERS = {} | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ | |
from .utils import import_term | ||
|
||
if TYPE_CHECKING: | ||
from .scheduler import SchedulerState, TaskState, WorkerState | ||
from .scheduler import Scheduler, TaskState, WorkerState | ||
|
||
|
||
class ActiveMemoryManagerExtension: | ||
|
@@ -31,7 +31,7 @@ class ActiveMemoryManagerExtension: | |
``distributed.scheduler.active-memory-manager``. | ||
""" | ||
|
||
scheduler: SchedulerState | ||
scheduler: Scheduler | ||
policies: set[ActiveMemoryManagerPolicy] | ||
interval: float | ||
|
||
|
@@ -43,7 +43,7 @@ class ActiveMemoryManagerExtension: | |
|
||
def __init__( | ||
self, | ||
scheduler: SchedulerState, | ||
scheduler: Scheduler, | ||
# The following parameters are exposed so that one may create, run, and throw | ||
# away on the fly a specialized manager, separate from the main one. | ||
policies: set[ActiveMemoryManagerPolicy] | None = None, | ||
|
@@ -126,12 +126,14 @@ def run_once(self, comm=None) -> None: | |
# populate self.pending | ||
self._run_policies() | ||
|
||
drop_by_worker = defaultdict(set) | ||
repl_by_worker = defaultdict(dict) | ||
drop_by_worker: defaultdict[str, set[str]] = defaultdict(set) | ||
repl_by_worker: defaultdict[str, dict[str, list[str]]] = defaultdict(dict) | ||
|
||
for ts, (pending_repl, pending_drop) in self.pending.items(): | ||
if not ts.who_has: | ||
continue | ||
who_has = [ws_snd.address for ws_snd in ts.who_has - pending_drop] | ||
|
||
assert who_has # Never drop the last replica | ||
for ws_rec in pending_repl: | ||
assert ws_rec not in ts.who_has | ||
|
@@ -143,8 +145,8 @@ def run_once(self, comm=None) -> None: | |
# Fire-and-forget enact recommendations from policies | ||
# This is temporary code, waiting for | ||
# https://github.com/dask/distributed/pull/5046 | ||
for addr, who_has in repl_by_worker.items(): | ||
asyncio.create_task(self.scheduler.gather_on_worker(addr, who_has)) | ||
for addr, who_has_map in repl_by_worker.items(): | ||
asyncio.create_task(self.scheduler.gather_on_worker(addr, who_has_map)) | ||
for addr, keys in drop_by_worker.items(): | ||
asyncio.create_task(self.scheduler.delete_worker_data(addr, keys)) | ||
# End temporary code | ||
|
@@ -215,7 +217,7 @@ def _find_recipient( | |
candidates -= pending_repl | ||
if not candidates: | ||
return None | ||
return min(candidates, key=self.workers_memory.get) | ||
return min(candidates, key=self.workers_memory.get) # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cringe a bit to suggest this, but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch |
||
|
||
def _find_dropper( | ||
self, | ||
|
@@ -244,7 +246,7 @@ def _find_dropper( | |
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters} | ||
if not candidates: | ||
return None | ||
return max(candidates, key=self.workers_memory.get) | ||
return max(candidates, key=self.workers_memory.get) # type: ignore | ||
|
||
|
||
class ActiveMemoryManagerPolicy: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
import atexit | ||
import copy | ||
|
@@ -22,6 +24,7 @@ | |
from functools import partial | ||
from numbers import Number | ||
from queue import Queue as pyQueue | ||
from typing import ClassVar | ||
|
||
from tlz import first, groupby, keymap, merge, partition_all, valmap | ||
|
||
|
@@ -49,7 +52,7 @@ | |
from tornado import gen | ||
from tornado.ioloop import IOLoop, PeriodicCallback | ||
|
||
from . import versions as version_module | ||
from . import versions as version_module # type: ignore | ||
from .batched import BatchedSend | ||
from .cfexecutor import ClientExecutor | ||
from .core import ( | ||
|
@@ -95,7 +98,9 @@ | |
|
||
logger = logging.getLogger(__name__) | ||
|
||
_global_clients = weakref.WeakValueDictionary() | ||
_global_clients: weakref.WeakValueDictionary[ | ||
int, Client | ||
] = weakref.WeakValueDictionary() | ||
_global_client_index = [0] | ||
|
||
_current_client = ContextVar("_current_client", default=None) | ||
|
@@ -105,7 +110,7 @@ | |
NO_DEFAULT_PLACEHOLDER = "_no_default_" | ||
|
||
|
||
def _get_global_client(): | ||
def _get_global_client() -> Client | None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to myself and others: 3.10 syntax, but backwards compatible with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. correct. 3.10 syntax works on all python versions as long as your version of mypy is recent. |
||
L = sorted(list(_global_clients), reverse=True) | ||
for k in L: | ||
c = _global_clients[k] | ||
|
@@ -116,13 +121,13 @@ def _get_global_client(): | |
return None | ||
|
||
|
||
def _set_global_client(c): | ||
def _set_global_client(c: Client | None) -> None: | ||
if c is not None: | ||
_global_clients[_global_client_index[0]] = c | ||
_global_client_index[0] += 1 | ||
|
||
|
||
def _del_global_client(c): | ||
def _del_global_client(c: Client) -> None: | ||
for k in list(_global_clients): | ||
try: | ||
if _global_clients[k] is c: | ||
|
@@ -590,7 +595,7 @@ class Client: | |
distributed.LocalCluster: | ||
""" | ||
|
||
_instances = weakref.WeakSet() | ||
_instances: ClassVar[weakref.WeakSet[Client]] = weakref.WeakSet() | ||
|
||
_default_event_handlers = {"print": _handle_print, "warn": _handle_warn} | ||
|
||
|
@@ -1377,8 +1382,6 @@ async def _close(self, fast=False): | |
|
||
self.status = "closed" | ||
|
||
_shutdown = _close | ||
|
||
Comment on lines
-1380
to
-1381
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just noting to future readers that there's a |
||
def close(self, timeout=no_default): | ||
"""Close this client | ||
|
||
|
@@ -2529,12 +2532,13 @@ def _get_computation_code() -> str: | |
) | ||
if not isinstance(ignore_modules, list): | ||
raise TypeError( | ||
f"Ignored modules must be a list. Instead got ({type(ignore_modules)}, {ignore_modules})" | ||
"Ignored modules must be a list. Instead got " | ||
f"({type(ignore_modules)}, {ignore_modules})" | ||
) | ||
|
||
pattern: re.Pattern | None | ||
if ignore_modules: | ||
pattern = "|".join([f"(?:{mod})" for mod in ignore_modules]) | ||
pattern = re.compile(pattern) | ||
pattern = re.compile("|".join([f"(?:{mod})" for mod in ignore_modules])) | ||
else: | ||
pattern = None | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed. We should always use lowercase
dict[...]
,list[...]
, etc. and putfrom __future__ import annotations
at the top of the module.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correction: can't.