From 9a41fb7cf2b59b5da9b5781bffdcf6184643b7f9 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 23 Sep 2021 13:35:06 +0100 Subject: [PATCH] adaptive_core --- distributed/deploy/adaptive_core.py | 98 +++++++++++++++++------------ docs/source/develop.rst | 2 +- setup.py | 2 +- 3 files changed, 61 insertions(+), 41 deletions(-) diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index c417cbe5d86..0bbd35834f3 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -1,7 +1,11 @@ -import collections +from __future__ import annotations + import logging import math +from collections import defaultdict, deque from collections.abc import Iterable +from datetime import timedelta +from typing import TYPE_CHECKING, cast import tlz as toolz from tornado.ioloop import IOLoop, PeriodicCallback @@ -10,6 +14,10 @@ from ..metrics import time +if TYPE_CHECKING: + from ..scheduler import WorkerState + + logger = logging.getLogger(__name__) @@ -60,7 +68,7 @@ class AdaptiveCore: ---------- minimum: int The minimum number of allowed workers - maximum: int + maximum: int | inf The maximum number of allowed workers wait_count: int The number of scale-down requests we should receive before actually @@ -69,17 +77,32 @@ class AdaptiveCore: The amount of time, like ``"1s"`` between checks """ + minimum: int + maximum: int | float + wait_count: int + interval: int | float + periodic_callback: PeriodicCallback | None + plan: set[WorkerState] + requested: set[WorkerState] + observed: set[WorkerState] + close_counts: defaultdict[WorkerState, int] + _adapting: bool + log: deque[tuple[float, dict]] + def __init__( self, minimum: int = 0, - maximum: int = math.inf, + maximum: int | float = math.inf, wait_count: int = 3, - interval: str = "1s", + interval: str | int | float | timedelta | None = "1s", ): + if not isinstance(maximum, int) and not math.isinf(maximum): + raise TypeError(f"maximum must be integer or inf; got {maximum}") + self.minimum = minimum self.maximum = maximum self.wait_count = wait_count - self.interval = parse_timedelta(interval, "seconds") if interval else interval + self.interval = parse_timedelta(interval, "seconds") self.periodic_callback = None def f(): @@ -104,17 +127,14 @@ async def _adapt(): except AttributeError: IOLoop.current().add_callback(f) - try: - self.plan = set() - self.requested = set() - self.observed = set() - except Exception: - pass + self.plan = set() + self.requested = set() + self.observed = set() # internal state - self.close_counts = collections.defaultdict(int) + self.close_counts = defaultdict(int) self._adapting = False - self.log = collections.deque(maxlen=10000) + self.log = deque(maxlen=10000) def stop(self): logger.info("Adaptive stop") @@ -138,7 +158,7 @@ async def safe_target(self) -> int: """Used internally, like target, but respects minimum/maximum""" n = await self.target() if n > self.maximum: - n = self.maximum + n = cast(int, self.maximum) if n < self.minimum: n = self.minimum @@ -163,34 +183,34 @@ async def recommendations(self, target: int) -> dict: self.close_counts.clear() return {"status": "same"} - elif target > len(plan): + if target > len(plan): self.close_counts.clear() return {"status": "up", "n": target} - elif target < len(plan): - not_yet_arrived = requested - observed - to_close = set() - if not_yet_arrived: - to_close.update(toolz.take(len(plan) - target, not_yet_arrived)) - - if target < len(plan) - len(to_close): - L = await self.workers_to_close(target=target) - to_close.update(L) - - firmly_close = set() - for w in to_close: - self.close_counts[w] += 1 - if self.close_counts[w] >= self.wait_count: - firmly_close.add(w) - - for k in list(self.close_counts): # clear out unseen keys - if k in firmly_close or k not in to_close: - del self.close_counts[k] - - if firmly_close: - return {"status": "down", "workers": list(firmly_close)} - else: - return {"status": "same"} + # target < len(plan) + not_yet_arrived = requested - observed + to_close = set() + if not_yet_arrived: + to_close.update(toolz.take(len(plan) - target, not_yet_arrived)) + + if target < len(plan) - len(to_close): + L = await self.workers_to_close(target=target) + to_close.update(L) + + firmly_close = set() + for w in to_close: + self.close_counts[w] += 1 + if self.close_counts[w] >= self.wait_count: + firmly_close.add(w) + + for k in list(self.close_counts): # clear out unseen keys + if k in firmly_close or k not in to_close: + del self.close_counts[k] + + if firmly_close: + return {"status": "down", "workers": list(firmly_close)} + else: + return {"status": "same"} async def adapt(self) -> None: """ diff --git a/docs/source/develop.rst b/docs/source/develop.rst index f0f534386bb..6ea8c750329 100644 --- a/docs/source/develop.rst +++ b/docs/source/develop.rst @@ -171,7 +171,7 @@ you need to test the command line interface. Linting ------- -distributed uses several code linters (flake8, black, isort, pyupgrade), which are +distributed uses several code linters (flake8, black, isort, pyupgrade, mypy), which are enforced by CI. Developers should run them locally before they submit a PR, through the single command ``pre-commit run --all-files``. This makes sure that linter versions and options are aligned for all developers. diff --git a/setup.py b/setup.py index db110f04398..9431b8a809d 100755 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ ), ] for e in cyext_modules: - e.cython_directives = { + e.cython_directives = { # type: ignore "annotation_typing": True, "binding": False, "embedsignature": True,