Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable mypy in CI 1/2 #5328

Merged
merged 24 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,23 @@ repos:
hooks:
- id: pyupgrade
args:
- "--py37-plus"
- --py37-plus
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910
hooks:
- id: mypy
additional_dependencies:
# Type stubs
- types-docutils
- types-requests
- types-paramiko
- types-pkg_resources
- types-PyYAML
- types-setuptools
- types-psutil
# Libraries exclusively imported under `if TYPE_CHECKING:`
- typing_extensions # To be reviewed after dropping Python 3.7
# Typed libraries
- numpy
- dask
- tornado
2 changes: 1 addition & 1 deletion distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from . import config # isort:skip; load distributed configuration first
from . import widgets # isort:skip; load distributed widgets second
import dask
from dask.config import config
from dask.config import config # type: ignore
from dask.utils import import_required

from ._version import get_versions
Expand Down
15 changes: 7 additions & 8 deletions distributed/_concurrent_futures_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,17 @@

"""Implements ThreadPoolExecutor."""

from __future__ import annotations

__author__ = "Brian Quinlan (brian@sweetapp.com)"

import atexit
import itertools
from concurrent.futures import _base

try:
import queue
except ImportError:
import Queue as queue

import os
import queue
import threading
import weakref
from concurrent.futures import _base

# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
Expand All @@ -34,7 +31,9 @@
# workers to exit when their work queues are empty and then waits until the
# threads finish.

_threads_queues = weakref.WeakKeyDictionary()
_threads_queues: weakref.WeakKeyDictionary[
threading.Thread, queue.Queue
] = weakref.WeakKeyDictionary()
_shutdown = False


Expand Down
10 changes: 2 additions & 8 deletions distributed/_ipython_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@

import atexit
import os

try:
import queue
except ImportError:
# Python 2
import Queue as queue

import queue
import sys
from subprocess import Popen
from threading import Event, Thread
Expand Down Expand Up @@ -135,7 +129,7 @@ def remote_magic(line, cell=None):


# cache clients for re-use in remote magic
remote_magic._clients = {}
remote_magic._clients = {} # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
remote_magic._clients = {} # type: ignore
remote_magic._clients: Dict[str, BlockingKernelClient] = {}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed. We should always use lowercase dict[...], list[...], etc. and put from __future__ import annotations at the top of the module.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: can't.

distributed/_ipython_utils.py:133: error: Type cannot be declared in assignment to non-self attribute
distributed/_ipython_utils.py:133: error: "Callable[[Any, Any], Any]" has no attribute "_clients"



def register_remote_magic(magic_name="remote"):
Expand Down
2 changes: 1 addition & 1 deletion distributed/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class NotThisMethod(Exception):
"""Exception raised if a method is not valid for the current scenario."""


LONG_VERSION_PY = {}
LONG_VERSION_PY: dict = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this file is missing a from __future__ import __annotations to make this work.

Copy link
Collaborator Author

@crusaderky crusaderky Sep 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's only necessary if you use bracket-stile dict[...] or if the annotations create circular dependencies

HANDLERS = {}


Expand Down
20 changes: 11 additions & 9 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .utils import import_term

if TYPE_CHECKING:
from .scheduler import SchedulerState, TaskState, WorkerState
from .scheduler import Scheduler, TaskState, WorkerState


class ActiveMemoryManagerExtension:
Expand All @@ -31,7 +31,7 @@ class ActiveMemoryManagerExtension:
``distributed.scheduler.active-memory-manager``.
"""

scheduler: SchedulerState
scheduler: Scheduler
policies: set[ActiveMemoryManagerPolicy]
interval: float

Expand All @@ -43,7 +43,7 @@ class ActiveMemoryManagerExtension:

def __init__(
self,
scheduler: SchedulerState,
scheduler: Scheduler,
# The following parameters are exposed so that one may create, run, and throw
# away on the fly a specialized manager, separate from the main one.
policies: set[ActiveMemoryManagerPolicy] | None = None,
Expand Down Expand Up @@ -126,12 +126,14 @@ def run_once(self, comm=None) -> None:
# populate self.pending
self._run_policies()

drop_by_worker = defaultdict(set)
repl_by_worker = defaultdict(dict)
drop_by_worker: defaultdict[str, set[str]] = defaultdict(set)
repl_by_worker: defaultdict[str, dict[str, list[str]]] = defaultdict(dict)

for ts, (pending_repl, pending_drop) in self.pending.items():
if not ts.who_has:
continue
who_has = [ws_snd.address for ws_snd in ts.who_has - pending_drop]

assert who_has # Never drop the last replica
for ws_rec in pending_repl:
assert ws_rec not in ts.who_has
Expand All @@ -143,8 +145,8 @@ def run_once(self, comm=None) -> None:
# Fire-and-forget enact recommendations from policies
# This is temporary code, waiting for
# https://github.com/dask/distributed/pull/5046
for addr, who_has in repl_by_worker.items():
asyncio.create_task(self.scheduler.gather_on_worker(addr, who_has))
for addr, who_has_map in repl_by_worker.items():
asyncio.create_task(self.scheduler.gather_on_worker(addr, who_has_map))
for addr, keys in drop_by_worker.items():
asyncio.create_task(self.scheduler.delete_worker_data(addr, keys))
# End temporary code
Expand Down Expand Up @@ -215,7 +217,7 @@ def _find_recipient(
candidates -= pending_repl
if not candidates:
return None
return min(candidates, key=self.workers_memory.get)
return min(candidates, key=self.workers_memory.get) # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cringe a bit to suggest this, but self.workers_memory.__getitem__ passes muster here, since it will raise if the key doesn't exist

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch


def _find_dropper(
self,
Expand Down Expand Up @@ -244,7 +246,7 @@ def _find_dropper(
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters}
if not candidates:
return None
return max(candidates, key=self.workers_memory.get)
return max(candidates, key=self.workers_memory.get) # type: ignore


class ActiveMemoryManagerPolicy:
Expand Down
26 changes: 15 additions & 11 deletions distributed/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import asyncio
import atexit
import copy
Expand All @@ -22,6 +24,7 @@
from functools import partial
from numbers import Number
from queue import Queue as pyQueue
from typing import ClassVar

from tlz import first, groupby, keymap, merge, partition_all, valmap

Expand Down Expand Up @@ -49,7 +52,7 @@
from tornado import gen
from tornado.ioloop import IOLoop, PeriodicCallback

from . import versions as version_module
from . import versions as version_module # type: ignore
from .batched import BatchedSend
from .cfexecutor import ClientExecutor
from .core import (
Expand Down Expand Up @@ -95,7 +98,9 @@

logger = logging.getLogger(__name__)

_global_clients = weakref.WeakValueDictionary()
_global_clients: weakref.WeakValueDictionary[
int, Client
] = weakref.WeakValueDictionary()
_global_client_index = [0]

_current_client = ContextVar("_current_client", default=None)
Expand All @@ -105,7 +110,7 @@
NO_DEFAULT_PLACEHOLDER = "_no_default_"


def _get_global_client():
def _get_global_client() -> Client | None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to myself and others: 3.10 syntax, but backwards compatible with from __future__ import annotations, AFAIK.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. 3.10 syntax works on all python versions as long as your version of mypy is recent.
Notable exception: cast, since it is executed at runtime, needs Python 3.10 style annotations wrapped in a string.

L = sorted(list(_global_clients), reverse=True)
for k in L:
c = _global_clients[k]
Expand All @@ -116,13 +121,13 @@ def _get_global_client():
return None


def _set_global_client(c):
def _set_global_client(c: Client | None) -> None:
if c is not None:
_global_clients[_global_client_index[0]] = c
_global_client_index[0] += 1


def _del_global_client(c):
def _del_global_client(c: Client) -> None:
for k in list(_global_clients):
try:
if _global_clients[k] is c:
Expand Down Expand Up @@ -590,7 +595,7 @@ class Client:
distributed.LocalCluster:
"""

_instances = weakref.WeakSet()
_instances: ClassVar[weakref.WeakSet[Client]] = weakref.WeakSet()

_default_event_handlers = {"print": _handle_print, "warn": _handle_warn}

Expand Down Expand Up @@ -1377,8 +1382,6 @@ async def _close(self, fast=False):

self.status = "closed"

_shutdown = _close

Comment on lines -1380 to -1381
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting to future readers that there's a _shutdown method defined a few lines below

def close(self, timeout=no_default):
"""Close this client

Expand Down Expand Up @@ -2529,12 +2532,13 @@ def _get_computation_code() -> str:
)
if not isinstance(ignore_modules, list):
raise TypeError(
f"Ignored modules must be a list. Instead got ({type(ignore_modules)}, {ignore_modules})"
"Ignored modules must be a list. Instead got "
f"({type(ignore_modules)}, {ignore_modules})"
)

pattern: re.Pattern | None
if ignore_modules:
pattern = "|".join([f"(?:{mod})" for mod in ignore_modules])
pattern = re.compile(pattern)
pattern = re.compile("|".join([f"(?:{mod})" for mod in ignore_modules]))
else:
pattern = None

Expand Down
6 changes: 4 additions & 2 deletions distributed/comm/addressing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import itertools

import dask
Expand All @@ -6,7 +8,7 @@
from . import registry


def parse_address(addr, strict=False):
def parse_address(addr: str, strict: bool = False) -> tuple[str, str]:
"""
Split address into its scheme and scheme-dependent location string.

Expand Down Expand Up @@ -145,7 +147,7 @@ def get_address_host(addr):
return backend.get_address_host(loc)


def get_local_address_for(addr):
def get_local_address_for(addr: str) -> str:
"""
Get a local listening address suitable for reaching *addr*.

Expand Down
39 changes: 20 additions & 19 deletions distributed/comm/core.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

import asyncio
import inspect
import logging
import random
import sys
import weakref
from abc import ABC, abstractmethod, abstractproperty
from abc import ABC, abstractmethod
from contextlib import suppress
from typing import ClassVar

import dask
from dask.utils import parse_timedelta
Expand Down Expand Up @@ -40,7 +43,7 @@ class Comm(ABC):
depending on the underlying transport's characteristics.
"""

_instances = weakref.WeakSet()
_instances: ClassVar[weakref.WeakSet[Comm]] = weakref.WeakSet()

def __init__(self):
self._instances.add(self)
Expand Down Expand Up @@ -99,21 +102,17 @@ def abort(self):

@abstractmethod
def closed(self):
"""
Return whether the stream is closed.
"""
"""Return whether the stream is closed."""

@abstractproperty
def local_address(self):
"""
The local address. For logging and debugging purposes only.
"""
@property
@abstractmethod
def local_address(self) -> str:
"""The local address. For logging and debugging purposes only."""

@abstractproperty
def peer_address(self):
"""
The peer's address. For logging and debugging purposes only.
"""
@property
@abstractmethod
def peer_address(self) -> str:
"""The peer's address. For logging and debugging purposes only."""

@property
def extra_info(self):
Expand Down Expand Up @@ -178,13 +177,15 @@ def stop(self):
communications, but prevents accepting new ones.
"""

@abstractproperty
@property
@abstractmethod
def listen_address(self):
"""
The listening address as a URI string.
"""

@abstractproperty
@property
@abstractmethod
def contact_address(self):
"""
An address this listener can be contacted on. This can be
Expand Down Expand Up @@ -227,9 +228,9 @@ async def on_connection(self, comm: Comm, handshake_overrides=None):
raise CommClosedError(f"Comm {comm!r} closed.") from e

comm.remote_info = handshake
comm.remote_info["address"] = comm._peer_addr
comm.remote_info["address"] = comm.peer_address
comm.local_info = local_info
comm.local_info["address"] = comm._local_addr
comm.local_info["address"] = comm.local_address

comm.handshake_options = comm.handshake_configuration(
comm.local_info, comm.remote_info
Expand Down
Loading