Skip to content

Commit

Permalink
adaptive_core
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Sep 23, 2021
1 parent 2b2de7e commit f64ddb3
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 45 deletions.
1 change: 1 addition & 0 deletions distributed/deploy/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,5 @@ async def scale_up(self, n):

@property
def loop(self):
"""Override Adaptive.loop"""
return self.cluster.loop
107 changes: 64 additions & 43 deletions distributed/deploy/adaptive_core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import collections
from __future__ import annotations

import logging
import math
from collections import defaultdict, deque
from collections.abc import Iterable
from datetime import timedelta
from typing import TYPE_CHECKING, cast

import tlz as toolz
from tornado.ioloop import IOLoop, PeriodicCallback
Expand All @@ -10,6 +14,10 @@

from ..metrics import time

if TYPE_CHECKING:
from ..scheduler import WorkerState


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -60,7 +68,7 @@ class AdaptiveCore:
----------
minimum: int
The minimum number of allowed workers
maximum: int
maximum: int | inf
The maximum number of allowed workers
wait_count: int
The number of scale-down requests we should receive before actually
Expand All @@ -69,17 +77,32 @@ class AdaptiveCore:
The amount of time, like ``"1s"`` between checks
"""

minimum: int
maximum: int | float
wait_count: int
interval: int | float
periodic_callback: PeriodicCallback | None
plan: set[WorkerState]
requested: set[WorkerState]
observed: set[WorkerState]
close_counts: defaultdict[WorkerState, int]
_adapting: bool
log: deque[tuple[float, dict]]

def __init__(
self,
minimum: int = 0,
maximum: int = math.inf,
maximum: int | float = math.inf,
wait_count: int = 3,
interval: str = "1s",
interval: str | int | float | timedelta | None = "1s",
):
if not isinstance(maximum, int) and not math.isinf(maximum):
raise TypeError(f"maximum must be integer or inf; got {maximum}")

self.minimum = minimum
self.maximum = maximum
self.wait_count = wait_count
self.interval = parse_timedelta(interval, "seconds") if interval else interval
self.interval = parse_timedelta(interval, "seconds")
self.periodic_callback = None

def f():
Expand All @@ -99,22 +122,16 @@ async def _adapt():
await core.adapt()

self.periodic_callback = PeriodicCallback(_adapt, self.interval * 1000)
try:
self.loop.add_callback(f)
except AttributeError:
IOLoop.current().add_callback(f)
self.loop.add_callback(f)

try:
self.plan = set()
self.requested = set()
self.observed = set()
except Exception:
pass
self.plan = set()
self.requested = set()
self.observed = set()

# internal state
self.close_counts = collections.defaultdict(int)
self.close_counts = defaultdict(int)
self._adapting = False
self.log = collections.deque(maxlen=10000)
self.log = deque(maxlen=10000)

def stop(self):
logger.info("Adaptive stop")
Expand All @@ -138,7 +155,7 @@ async def safe_target(self) -> int:
"""Used internally, like target, but respects minimum/maximum"""
n = await self.target()
if n > self.maximum:
n = self.maximum
n = cast(int, self.maximum)

if n < self.minimum:
n = self.minimum
Expand All @@ -163,34 +180,34 @@ async def recommendations(self, target: int) -> dict:
self.close_counts.clear()
return {"status": "same"}

elif target > len(plan):
if target > len(plan):
self.close_counts.clear()
return {"status": "up", "n": target}

elif target < len(plan):
not_yet_arrived = requested - observed
to_close = set()
if not_yet_arrived:
to_close.update(toolz.take(len(plan) - target, not_yet_arrived))

if target < len(plan) - len(to_close):
L = await self.workers_to_close(target=target)
to_close.update(L)

firmly_close = set()
for w in to_close:
self.close_counts[w] += 1
if self.close_counts[w] >= self.wait_count:
firmly_close.add(w)

for k in list(self.close_counts): # clear out unseen keys
if k in firmly_close or k not in to_close:
del self.close_counts[k]

if firmly_close:
return {"status": "down", "workers": list(firmly_close)}
else:
return {"status": "same"}
# target < len(plan)
not_yet_arrived = requested - observed
to_close = set()
if not_yet_arrived:
to_close.update(toolz.take(len(plan) - target, not_yet_arrived))

if target < len(plan) - len(to_close):
L = await self.workers_to_close(target=target)
to_close.update(L)

firmly_close = set()
for w in to_close:
self.close_counts[w] += 1
if self.close_counts[w] >= self.wait_count:
firmly_close.add(w)

for k in list(self.close_counts): # clear out unseen keys
if k in firmly_close or k not in to_close:
del self.close_counts[k]

if firmly_close:
return {"status": "down", "workers": list(firmly_close)}
else:
return {"status": "same"}

async def adapt(self) -> None:
"""
Expand Down Expand Up @@ -231,3 +248,7 @@ async def adapt(self) -> None:

def __del__(self):
self.stop()

@property
def loop(self):
return IOLoop.current()
2 changes: 1 addition & 1 deletion docs/source/develop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ you need to test the command line interface.

Linting
-------
distributed uses several code linters (flake8, black, isort, pyupgrade), which are
distributed uses several code linters (flake8, black, isort, pyupgrade, mypy), which are
enforced by CI. Developers should run them locally before they submit a PR, through the
single command ``pre-commit run --all-files``. This makes sure that linter versions and
options are aligned for all developers.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
),
]
for e in cyext_modules:
e.cython_directives = {
e.cython_directives = { # type: ignore
"annotation_typing": True,
"binding": False,
"embedsignature": True,
Expand Down

0 comments on commit f64ddb3

Please sign in to comment.