From fc248d80e0b575fe906e5d537f6ed08f0aaf5ef4 Mon Sep 17 00:00:00 2001 From: Viktor Mirzoyan Date: Wed, 27 Dec 2023 15:16:57 -0500 Subject: [PATCH] Support automatic shutdown due to high memory * Add support for automatic shutdown on high memory via CLI option and app setting * Check for already running web server --- faust/__init__.py | 2 +- faust/cli/worker.py | 9 +++++++- faust/types/_env.py | 3 +++ faust/types/settings.py | 14 +++++++++++ faust/web/drivers/aiohttp.py | 7 +++--- faust/worker.py | 45 ++++++++++++++++++++++++++++++++++++ requirements/default.txt | 3 ++- 7 files changed, 77 insertions(+), 6 deletions(-) diff --git a/faust/__init__.py b/faust/__init__.py index 278fc9217..bd670a409 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -24,7 +24,7 @@ from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple -__version__ = '1.13.3' +__version__ = '1.13.4' __author__ = 'Robinhood Markets, Inc.' __contact__ = 'contact@fauststream.com' __homepage__ = 'http://faust.readthedocs.io/' diff --git a/faust/cli/worker.py b/faust/cli/worker.py index db86a0d1d..8dac6eb3a 100644 --- a/faust/cli/worker.py +++ b/faust/cli/worker.py @@ -12,7 +12,7 @@ from faust.worker import Worker as FaustWorker from faust.types import AppT -from faust.types._env import WEB_BIND, WEB_PORT, WEB_TRANSPORT +from faust.types._env import WEB_BIND, WEB_PORT, WEB_TRANSPORT, SHUTDOWN_MEMORY_PERCENT from . import params from .base import AppCommand, now_builtin_worker_options, option @@ -46,6 +46,10 @@ class worker(AppCommand): default=socket.gethostname(), type=str, help=f'Canonical host name for the web server ' f'(default: {WEB_BIND})'), + option('--shutdown-memory-percent', '-s', + default=None, type=float, + help=f'Memory consumption percentage that triggers shutdown ' + f'(default: {SHUTDOWN_MEMORY_PERCENT})'), ] options = (cast(List, worker_options) + @@ -74,6 +78,7 @@ def _init_worker_options(self, web_bind: Optional[str], web_host: str, web_transport: URL, + shutdown_memory_percent: float, **kwargs: Any) -> None: self.app.conf.web_enabled = with_web if web_port is not None: @@ -84,6 +89,8 @@ def _init_worker_options(self, self.app.conf.web_host = web_host if web_transport is not None: self.app.conf.web_transport = web_transport + if shutdown_memory_percent is not None: + self.app.conf.worker_shutdown_memory_utilization_percent = shutdown_memory_percent @property def _Worker(self) -> Type[Worker]: diff --git a/faust/types/_env.py b/faust/types/_env.py index 39d3e1ef4..543ed01f9 100644 --- a/faust/types/_env.py +++ b/faust/types/_env.py @@ -13,6 +13,7 @@ 'WEB_BIND', 'WEB_TRANSPORT', 'WORKDIR', + 'SHUTDOWN_MEMORY_PERCENT' ] PREFICES: Sequence[str] = ['FAUST_', 'F_'] @@ -51,3 +52,5 @@ def _getenv(name: str, *default: Any, WEB_PORT: int = int(_getenv('WEB_PORT', '6066')) WEB_BIND: str = _getenv('F_WEB_BIND', '0.0.0.0') WEB_TRANSPORT: URL = URL(_getenv('WEB_TRANSPORT', 'tcp://')) + +SHUTDOWN_MEMORY_PERCENT: float = float(_getenv('SHUTDOWN_MEMORY_PERCENT', '0.0')) diff --git a/faust/types/settings.py b/faust/types/settings.py index d175cb7cf..28867d17f 100644 --- a/faust/types/settings.py +++ b/faust/types/settings.py @@ -354,6 +354,16 @@ class _WorkerT: ... # noqa inspect.Parameter.VAR_KEYWORD, } +#: Memory utilization threshold that triggers a faust worker to +#: shutdown. Applicable only when faust is deployed using a `Worker`. +#: +#: An app will abruptly terminate if it exceeds it's allocated memory limit. +#: This may have unintendent consequences, especially when sateful Tables are +#: involved. This settings allows for graceful shutdown when high memory utilization +#: is detected. This setting is disabled by default. To enable, set percent value +#: between >0.0 and <=1.0 +WORKER_SHUTDOWN_MEMORY_UTILIZATION_PERCENT = 0.0 + AutodiscoverArg = Union[ bool, Iterable[str], @@ -406,6 +416,7 @@ class Settings(abc.ABC): web_cors_options: Optional[Mapping[str, ResourceOptions]] = None worker_redirect_stdouts: bool = True worker_redirect_stdouts_level: Severity = 'WARN' + worker_shutdown_memory_utilization_percent: float = WORKER_SHUTDOWN_MEMORY_UTILIZATION_PERCENT _id: str _origin: Optional[str] = None @@ -563,6 +574,7 @@ def __init__( # noqa: C901 web_cors_options: Mapping[str, ResourceOptions] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Severity = None, + worker_shutdown_memory_utilization_percent: float = None, Agent: SymbolArg[Type[AgentT]] = None, ConsumerScheduler: SymbolArg[Type[SchedulingStrategyT]] = None, Event: SymbolArg[Type[EventT]] = None, @@ -734,6 +746,8 @@ def __init__( # noqa: C901 self.worker_redirect_stdouts = worker_redirect_stdouts if worker_redirect_stdouts_level is not None: self.worker_redirect_stdouts_level = worker_redirect_stdouts_level + if worker_shutdown_memory_utilization_percent is not None: + self.worker_shutdown_memory_utilization_percent = worker_shutdown_memory_utilization_percent if reply_to_prefix is not None: self.reply_to_prefix = reply_to_prefix diff --git a/faust/web/drivers/aiohttp.py b/faust/web/drivers/aiohttp.py index f0485bc05..bfbbb92bf 100644 --- a/faust/web/drivers/aiohttp.py +++ b/faust/web/drivers/aiohttp.py @@ -299,9 +299,10 @@ def _new_transport_unix(self) -> BaseSite: async def start_server(self) -> None: """Start the web server.""" - await self._runner.setup() - site = self._create_site() - await site.start() + if self._runner.server is None: + await self._runner.setup() + site = self._create_site() + await site.start() async def stop_server(self) -> None: """Stop the web server.""" diff --git a/faust/worker.py b/faust/worker.py index 5097f8e78..b52eb65f6 100644 --- a/faust/worker.py +++ b/faust/worker.py @@ -9,6 +9,7 @@ import logging import os import sys +import psutil from collections import defaultdict from itertools import chain @@ -19,6 +20,7 @@ from aiokafka.structs import TopicPartition from mode import ServiceT, get_logger from mode.utils.logging import Severity, formatter2 +from mode.utils.locks import Event from .types import AppT, SensorT, TP, TopicT from .types._env import ( @@ -43,6 +45,14 @@ def setproctitle(title: str) -> None: ... # noqa TP_TYPES = (TP, TopicPartition) +W_SHUTDOWN_DUE_TO_MEMORY = '''\ +Worker process is consuming {percent} of available memory. + +Application will gracefully shutdown now to avoid an abrupt termination due +an out of memory situation. If this happens frequently, consider +increasing the application's memory limit. +''' + logger = get_logger(__name__) @@ -217,6 +227,14 @@ def manage_loop(): #: Set by signal to avoid printing an OK status. _shutdown_immediately: bool = False + #: Set when worker has initilized and started + _on_startup_finished: Event + + #: OS process running the worker + _worker_process: psutil.Process + + shutdown_memory_utilization_percent: float + def __init__(self, app: AppT, *services: ServiceT, @@ -234,11 +252,13 @@ def __init__(self, redirect_stdouts: bool = None, redirect_stdouts_level: Severity = None, logging_config: Dict = None, + shutdown_memory_utilization_percent: float = None, **kwargs: Any) -> None: self.app = app self.sensors = set(sensors or []) self.workdir = Path(workdir or Path.cwd()) conf = app.conf + self.shutdown_memory_utilization_percent = conf.worker_shutdown_memory_utilization_percent if redirect_stdouts is None: redirect_stdouts = conf.worker_redirect_stdouts if redirect_stdouts_level is None: @@ -246,6 +266,7 @@ def __init__(self, conf.worker_redirect_stdouts_level or logging.INFO) if logging_config is None: logging_config = app.conf.logging_config + super().__init__( *services, debug=debug, @@ -263,6 +284,8 @@ def __init__(self, loop=loop, **kwargs) self.spinner = terminal.Spinner(file=self.stdout) + self._on_startup_finished = Event() + self._worker_process = psutil.Process(os.getpid()) async def on_start(self) -> None: """Signal called every time the worker starts.""" @@ -293,6 +316,7 @@ async def maybe_start_blockdetection(self) -> None: async def on_startup_finished(self) -> None: """Signal called when worker has started.""" + self._on_startup_finished.set() if self._shutdown_immediately: return self._on_shutdown_immediately() # block detection started here after changelog stuff, @@ -392,3 +416,24 @@ def _setup_spinner_handler( logger.addHandler( terminal.SpinnerHandler(self.spinner, level=logging.DEBUG)) logger.setLevel(logging.DEBUG) + + + @mode.Service.task + async def _stop_worker_when_high_memory_utilization(self) -> None: + """Monitor worker's memory utilization. + If memory utilization exceeds set threshold then gracefully stop + worker before we run out of memory. + """ + await self.wait(self._on_startup_finished) + while not self.should_stop: + used_percent = self._worker_process.memory_percent() + if self.shutdown_memory_utilization_percent > 0: + if used_percent > self.shutdown_memory_utilization_percent: + self.log.info( + W_SHUTDOWN_DUE_TO_MEMORY.format( + percent='{:.2%}'.format(used_percent) + ) + ) + await self.stop() + else: + await self.sleep(30) \ No newline at end of file diff --git a/requirements/default.txt b/requirements/default.txt index 67ec01b97..88c4c4773 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -9,4 +9,5 @@ venusian>=1.1,<2.0 yarl>=1.0,<2.0 croniter>=0.3.16 mypy_extensions -intervaltree \ No newline at end of file +intervaltree==3.1.0 +psutil==5.8.0 \ No newline at end of file