Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 98 additions & 5 deletions fastapi_utils/tasks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,107 @@
import asyncio
import logging
import asyncio, logging
from datetime import datetime, timedelta
from asyncio import ensure_future
from functools import wraps
from traceback import format_exception
from typing import Any, Callable, Coroutine, Optional, Union

from croniter import croniter
from starlette.concurrency import run_in_threadpool

NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[[Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT]

async def _seconds_to_next_run(
seconds: float = 3600.0, # Default 1h
cron: Union[str, None] = None,
logger: Optional[logging.Logger] = None,
raise_exceptions: bool = False
) -> Coroutine[Any, Any, float]:
if cron:
try:
next_dt = croniter(cron, datetime.now()).get_next(datetime)
if logger is not None:
logger.info(f'{logger.name} next run: {next_dt.replace(microsecond=0).isoformat()}')
return (next_dt - datetime.now()).total_seconds()
except Exception as exc:
if logger is not None:
formatted_exception = "".join(format_exception(type(exc), exc, exc.__traceback__))
logger.error(formatted_exception)
if raise_exceptions:
raise exc
elif seconds and logger is not None:
next_dt = datetime.now() + timedelta(seconds=seconds)
logging.info(f'{logger.name} next run: {next_dt.replace(microsecond=0).isoformat()}')

return seconds

def repeat_at(
*,
cron: str,
wait_first: bool = False,
logger: Optional[logging.Logger] = None,
raise_exceptions: bool = False,
max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
"""
This function returns a decorator that modifies a function so it is periodically re-executed after its first call.

The function it decorates should accept no arguments and return nothing. If necessary, this can be accomplished
by using `functools.partial` or otherwise wrapping the target function prior to decoration.

Parameters
----------
cron: str
Cron-style string for periodic execution, eg. '0 0 * * *' every midnight
wait_first: bool (default False)
If True, the function will wait for a single period before the first call
logger: Optional[logging.Logger] (default None)
The logger to use to log any exceptions raised by calls to the decorated function.
If not provided, exceptions will not be logged by this function (though they may be handled by the event loop).
raise_exceptions: bool (default False)
If True, errors raised by the decorated function will be raised to the event loop's exception handler.
Note that if an error is raised, the repeated execution will stop.
Otherwise, exceptions are just logged and the execution continues to repeat.
See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.set_exception_handler for more info.
max_repetitions: Optional[int] (default None)
The maximum number of times to call the repeated function. If `None`, the function is repeated forever.
"""

def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
"""
Converts the decorated function into a repeated, periodically-called version of itself.
"""
is_coroutine = asyncio.iscoroutinefunction(func)

@wraps(func)
async def wrapped() -> None:
repetitions = 0
async def loop() -> None:
nonlocal repetitions
if wait_first:
seconds_to_next = await _seconds_to_next_run(cron=cron, logger=logger, raise_exceptions=raise_exceptions)
await asyncio.sleep(seconds_to_next)
while max_repetitions is None or repetitions < max_repetitions:
try:
if is_coroutine:
await func() # type: ignore
else:
await run_in_threadpool(func)
repetitions += 1
except Exception as exc:
if logger is not None:
formatted_exception = "".join(format_exception(type(exc), exc, exc.__traceback__))
logger.error(formatted_exception)
if raise_exceptions:
raise exc
seconds_to_next = await _seconds_to_next_run(cron=cron, logger=logger, raise_exceptions=raise_exceptions)
await asyncio.sleep(seconds_to_next)

ensure_future(loop())

return wrapped

return decorator

def repeat_every(
*,
Expand Down Expand Up @@ -57,7 +148,8 @@ async def wrapped() -> None:
async def loop() -> None:
nonlocal repetitions
if wait_first:
await asyncio.sleep(seconds)
seconds_to_next = await _seconds_to_next_run(seconds=seconds, logger=logger, raise_exceptions=raise_exceptions)
await asyncio.sleep(seconds_to_next)
while max_repetitions is None or repetitions < max_repetitions:
try:
if is_coroutine:
Expand All @@ -71,7 +163,8 @@ async def loop() -> None:
logger.error(formatted_exception)
if raise_exceptions:
raise exc
await asyncio.sleep(seconds)
seconds_to_next = await _seconds_to_next_run(seconds=seconds, logger=logger, raise_exceptions=raise_exceptions)
await asyncio.sleep(seconds_to_next)

ensure_future(loop())

Expand Down
Loading