Skip to content

Commit

Permalink
Support automatic shutdown due to high memory
Browse files Browse the repository at this point in the history
* Add support for automatic shutdown on high memory via CLI option and app setting
* Check for already running web server
  • Loading branch information
vmirz committed Dec 27, 2023
1 parent 2d9ebd4 commit fc248d8
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 6 deletions.
2 changes: 1 addition & 1 deletion faust/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple

__version__ = '1.13.3'
__version__ = '1.13.4'
__author__ = 'Robinhood Markets, Inc.'
__contact__ = 'contact@fauststream.com'
__homepage__ = 'http://faust.readthedocs.io/'
Expand Down
9 changes: 8 additions & 1 deletion faust/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from faust.worker import Worker as FaustWorker
from faust.types import AppT
from faust.types._env import WEB_BIND, WEB_PORT, WEB_TRANSPORT
from faust.types._env import WEB_BIND, WEB_PORT, WEB_TRANSPORT, SHUTDOWN_MEMORY_PERCENT

from . import params
from .base import AppCommand, now_builtin_worker_options, option
Expand Down Expand Up @@ -46,6 +46,10 @@ class worker(AppCommand):
default=socket.gethostname(), type=str,
help=f'Canonical host name for the web server '
f'(default: {WEB_BIND})'),
option('--shutdown-memory-percent', '-s',
default=None, type=float,
help=f'Memory consumption percentage that triggers shutdown '
f'(default: {SHUTDOWN_MEMORY_PERCENT})'),
]

options = (cast(List, worker_options) +
Expand Down Expand Up @@ -74,6 +78,7 @@ def _init_worker_options(self,
web_bind: Optional[str],
web_host: str,
web_transport: URL,
shutdown_memory_percent: float,
**kwargs: Any) -> None:
self.app.conf.web_enabled = with_web
if web_port is not None:
Expand All @@ -84,6 +89,8 @@ def _init_worker_options(self,
self.app.conf.web_host = web_host
if web_transport is not None:
self.app.conf.web_transport = web_transport
if shutdown_memory_percent is not None:
self.app.conf.worker_shutdown_memory_utilization_percent = shutdown_memory_percent

@property
def _Worker(self) -> Type[Worker]:
Expand Down
3 changes: 3 additions & 0 deletions faust/types/_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
'WEB_BIND',
'WEB_TRANSPORT',
'WORKDIR',
'SHUTDOWN_MEMORY_PERCENT'
]

PREFICES: Sequence[str] = ['FAUST_', 'F_']
Expand Down Expand Up @@ -51,3 +52,5 @@ def _getenv(name: str, *default: Any,
WEB_PORT: int = int(_getenv('WEB_PORT', '6066'))
WEB_BIND: str = _getenv('F_WEB_BIND', '0.0.0.0')
WEB_TRANSPORT: URL = URL(_getenv('WEB_TRANSPORT', 'tcp://'))

SHUTDOWN_MEMORY_PERCENT: float = float(_getenv('SHUTDOWN_MEMORY_PERCENT', '0.0'))
14 changes: 14 additions & 0 deletions faust/types/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,16 @@ class _WorkerT: ... # noqa
inspect.Parameter.VAR_KEYWORD,
}

#: Memory utilization threshold that triggers a faust worker to
#: shutdown. Applicable only when faust is deployed using a `Worker`.
#:
#: An app will abruptly terminate if it exceeds it's allocated memory limit.
#: This may have unintendent consequences, especially when sateful Tables are
#: involved. This settings allows for graceful shutdown when high memory utilization
#: is detected. This setting is disabled by default. To enable, set percent value
#: between >0.0 and <=1.0
WORKER_SHUTDOWN_MEMORY_UTILIZATION_PERCENT = 0.0

AutodiscoverArg = Union[
bool,
Iterable[str],
Expand Down Expand Up @@ -406,6 +416,7 @@ class Settings(abc.ABC):
web_cors_options: Optional[Mapping[str, ResourceOptions]] = None
worker_redirect_stdouts: bool = True
worker_redirect_stdouts_level: Severity = 'WARN'
worker_shutdown_memory_utilization_percent: float = WORKER_SHUTDOWN_MEMORY_UTILIZATION_PERCENT

_id: str
_origin: Optional[str] = None
Expand Down Expand Up @@ -563,6 +574,7 @@ def __init__( # noqa: C901
web_cors_options: Mapping[str, ResourceOptions] = None,
worker_redirect_stdouts: bool = None,
worker_redirect_stdouts_level: Severity = None,
worker_shutdown_memory_utilization_percent: float = None,
Agent: SymbolArg[Type[AgentT]] = None,
ConsumerScheduler: SymbolArg[Type[SchedulingStrategyT]] = None,
Event: SymbolArg[Type[EventT]] = None,
Expand Down Expand Up @@ -734,6 +746,8 @@ def __init__( # noqa: C901
self.worker_redirect_stdouts = worker_redirect_stdouts
if worker_redirect_stdouts_level is not None:
self.worker_redirect_stdouts_level = worker_redirect_stdouts_level
if worker_shutdown_memory_utilization_percent is not None:
self.worker_shutdown_memory_utilization_percent = worker_shutdown_memory_utilization_percent

if reply_to_prefix is not None:
self.reply_to_prefix = reply_to_prefix
Expand Down
7 changes: 4 additions & 3 deletions faust/web/drivers/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,10 @@ def _new_transport_unix(self) -> BaseSite:

async def start_server(self) -> None:
"""Start the web server."""
await self._runner.setup()
site = self._create_site()
await site.start()
if self._runner.server is None:
await self._runner.setup()
site = self._create_site()
await site.start()

async def stop_server(self) -> None:
"""Stop the web server."""
Expand Down
45 changes: 45 additions & 0 deletions faust/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
import os
import sys
import psutil

from collections import defaultdict
from itertools import chain
Expand All @@ -19,6 +20,7 @@
from aiokafka.structs import TopicPartition
from mode import ServiceT, get_logger
from mode.utils.logging import Severity, formatter2
from mode.utils.locks import Event

from .types import AppT, SensorT, TP, TopicT
from .types._env import (
Expand All @@ -43,6 +45,14 @@ def setproctitle(title: str) -> None: ... # noqa

TP_TYPES = (TP, TopicPartition)

W_SHUTDOWN_DUE_TO_MEMORY = '''\
Worker process is consuming {percent} of available memory.
Application will gracefully shutdown now to avoid an abrupt termination due
an out of memory situation. If this happens frequently, consider
increasing the application's memory limit.
'''

logger = get_logger(__name__)


Expand Down Expand Up @@ -217,6 +227,14 @@ def manage_loop():
#: Set by signal to avoid printing an OK status.
_shutdown_immediately: bool = False

#: Set when worker has initilized and started
_on_startup_finished: Event

#: OS process running the worker
_worker_process: psutil.Process

shutdown_memory_utilization_percent: float

def __init__(self,
app: AppT,
*services: ServiceT,
Expand All @@ -234,18 +252,21 @@ def __init__(self,
redirect_stdouts: bool = None,
redirect_stdouts_level: Severity = None,
logging_config: Dict = None,
shutdown_memory_utilization_percent: float = None,
**kwargs: Any) -> None:
self.app = app
self.sensors = set(sensors or [])
self.workdir = Path(workdir or Path.cwd())
conf = app.conf
self.shutdown_memory_utilization_percent = conf.worker_shutdown_memory_utilization_percent
if redirect_stdouts is None:
redirect_stdouts = conf.worker_redirect_stdouts
if redirect_stdouts_level is None:
redirect_stdouts_level = (
conf.worker_redirect_stdouts_level or logging.INFO)
if logging_config is None:
logging_config = app.conf.logging_config

super().__init__(
*services,
debug=debug,
Expand All @@ -263,6 +284,8 @@ def __init__(self,
loop=loop,
**kwargs)
self.spinner = terminal.Spinner(file=self.stdout)
self._on_startup_finished = Event()
self._worker_process = psutil.Process(os.getpid())

async def on_start(self) -> None:
"""Signal called every time the worker starts."""
Expand Down Expand Up @@ -293,6 +316,7 @@ async def maybe_start_blockdetection(self) -> None:

async def on_startup_finished(self) -> None:
"""Signal called when worker has started."""
self._on_startup_finished.set()
if self._shutdown_immediately:
return self._on_shutdown_immediately()
# block detection started here after changelog stuff,
Expand Down Expand Up @@ -392,3 +416,24 @@ def _setup_spinner_handler(
logger.addHandler(
terminal.SpinnerHandler(self.spinner, level=logging.DEBUG))
logger.setLevel(logging.DEBUG)


@mode.Service.task
async def _stop_worker_when_high_memory_utilization(self) -> None:
"""Monitor worker's memory utilization.
If memory utilization exceeds set threshold then gracefully stop
worker before we run out of memory.
"""
await self.wait(self._on_startup_finished)
while not self.should_stop:
used_percent = self._worker_process.memory_percent()
if self.shutdown_memory_utilization_percent > 0:
if used_percent > self.shutdown_memory_utilization_percent:
self.log.info(
W_SHUTDOWN_DUE_TO_MEMORY.format(
percent='{:.2%}'.format(used_percent)
)
)
await self.stop()
else:
await self.sleep(30)
3 changes: 2 additions & 1 deletion requirements/default.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ venusian>=1.1,<2.0
yarl>=1.0,<2.0
croniter>=0.3.16
mypy_extensions
intervaltree
intervaltree==3.1.0
psutil==5.8.0

0 comments on commit fc248d8

Please sign in to comment.