Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyshockov committed Sep 13, 2024
1 parent dc06e9c commit 44a033d
Show file tree
Hide file tree
Showing 23 changed files with 275 additions and 193 deletions.
1 change: 0 additions & 1 deletion examples/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from justscheduleit import Scheduler, after, every, recurrent

logging.basicConfig()

logging.getLogger("justscheduleit").setLevel(logging.DEBUG)

scheduler = Scheduler()
Expand Down
43 changes: 0 additions & 43 deletions examples/async_cancellation.py

This file was deleted.

32 changes: 32 additions & 0 deletions examples/cancellation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python

import logging
from asyncio import CancelledError
from datetime import timedelta

import anyio

from justscheduleit import Scheduler, every

logging.basicConfig()
logging.getLogger("justscheduleit").setLevel(logging.DEBUG)

scheduler = Scheduler()


@scheduler.task(every(timedelta(seconds=5), delay=None))
async def long_async_task():
print(f"{long_async_task.__name__} is triggered!")
try:
await anyio.sleep(15)
except CancelledError:
# Won't be cancelled in the normal flow (graceful shutdown)
print(f"Forced shutdown!")
raise
print(f"{long_async_task.__name__} has finished!")


if __name__ == "__main__":
import justscheduleit

justscheduleit.run(scheduler)
3 changes: 1 addition & 2 deletions examples/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from justscheduleit.hosting import Host

logging.basicConfig()

logging.getLogger("justscheduleit").setLevel(logging.DEBUG)


Expand Down Expand Up @@ -41,4 +40,4 @@ async def scheduled_background_task():
if __name__ == "__main__":
from justscheduleit import hosting

hosting.run(host)
exit(hosting.run(host))
1 change: 0 additions & 1 deletion examples/cron_cond.py → examples/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from justscheduleit.cond.cron import cron

logging.basicConfig()

logging.getLogger("justscheduleit").setLevel(logging.DEBUG)

scheduler = Scheduler()
Expand Down
4 changes: 2 additions & 2 deletions examples/failing_task.py → examples/failing_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import logging
from datetime import timedelta

import justscheduleit
from justscheduleit import Scheduler, every

logging.basicConfig()

logging.getLogger("justscheduleit").setLevel(logging.DEBUG)

scheduler = Scheduler()
Expand All @@ -26,4 +24,6 @@ def a_sync_task():


if __name__ == "__main__":
import justscheduleit

justscheduleit.run(scheduler)
5 changes: 3 additions & 2 deletions examples/fastdepends.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from justscheduleit import Scheduler, every

logging.basicConfig()

logging.getLogger("justscheduleit").setLevel(logging.DEBUG)

scheduler = Scheduler()
Expand All @@ -30,4 +29,6 @@ def print_task(


if __name__ == "__main__":
scheduler.run()
import justscheduleit

justscheduleit.run(scheduler)
1 change: 0 additions & 1 deletion examples/finite_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from justscheduleit.cond import take_first

logging.basicConfig()

logging.getLogger("justscheduleit").setLevel(logging.DEBUG)

scheduler = Scheduler()
Expand Down
3 changes: 1 addition & 2 deletions examples/host/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from justscheduleit.hosting import Host, ServiceLifetime

logging.basicConfig()

logging.getLogger("justscheduleit").setLevel(logging.DEBUG)

host = Host()
Expand Down Expand Up @@ -89,4 +88,4 @@ async def an_async_service_tpl(n: int):
if __name__ == "__main__":
from justscheduleit import hosting

hosting.run(host)
exit(hosting.run(host))
26 changes: 26 additions & 0 deletions examples/host/failing_main_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python

import time

from examples.host.failing_service import host
from justscheduleit.hosting import ServiceLifetime


@host.service(main=True)
def a_main_sync_service(service_lifetime: ServiceLifetime):
print(f"{a_main_sync_service.__name__} started")
service_lifetime.set_started()
crash_countdown = 5
while not service_lifetime.shutting_down.is_set():
print(f"{a_main_sync_service.__name__} running")
time.sleep(1)
crash_countdown -= 1
if crash_countdown == 0:
raise RuntimeError("This is a test error from _sync_")
print(f"{a_main_sync_service.__name__} stopped")


if __name__ == "__main__":
from justscheduleit import hosting

exit(hosting.run(host))
5 changes: 2 additions & 3 deletions examples/host/failing_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from justscheduleit.hosting import Host, ServiceLifetime

logging.basicConfig()

logging.getLogger("justscheduleit").setLevel(logging.DEBUG)

host = Host()
Expand All @@ -33,7 +32,7 @@ async def an_async_func():
try:
while True:
print(f"{an_async_func.__name__} running")
await anyio.sleep(5)
await anyio.sleep(1)
# raise RuntimeError("This is a test error from _async_")
finally:
print(f"{an_async_func.__name__} done")
Expand All @@ -42,4 +41,4 @@ async def an_async_func():
if __name__ == "__main__":
from justscheduleit import hosting

hosting.run(host)
exit(hosting.run(host))
3 changes: 1 addition & 2 deletions examples/host/finite_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from justscheduleit.hosting import Host, ServiceLifetime

logging.basicConfig()

logging.getLogger("justscheduleit").setLevel(logging.DEBUG)

host = Host()
Expand All @@ -25,4 +24,4 @@ def a_sync_service(service_lifetime: ServiceLifetime):
if __name__ == "__main__":
from justscheduleit import hosting

hosting.run(host)
exit(hosting.run(host))
27 changes: 11 additions & 16 deletions examples/host/http_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@
from datetime import timedelta

import anyio
from fastapi import FastAPI # noqa
from fastapi import FastAPI
from starlette.responses import JSONResponse

from justscheduleit import Scheduler, every
from justscheduleit.hosting import Host
from justscheduleit.http import UvicornService

logging.basicConfig()

logging.getLogger("justscheduleit").setLevel(logging.DEBUG)

host = Host()

http_api = FastAPI()
host.add_service(UvicornService.for_app(http_api), name="http_api")

scheduler = Scheduler()
host.add_service(scheduler, name="scheduler")


@host.service()
async def background_queue_consumer():
Expand All @@ -28,33 +34,22 @@ async def background_queue_consumer():
print(f"{background_queue_consumer.__name__} done")


scheduler = Scheduler()
host.add_service(scheduler, name="scheduler")


@scheduler.task(every(timedelta(seconds=3), delay=(1, 5)))
async def heavy_background_task():
print("Some work here")


# Or Starlette
http_api = FastAPI()
host.add_service(UvicornService.for_app(http_api), name="http_api")


@http_api.get("/predict")
async def predict():
return {"result": "some prediction"}


@http_api.get("/health")
async def health_check():
services = host.lifetime.service_lifetimes
statuses = {name: service.status for name, service in services.items()}
return {"status": "UP", "services": statuses}
async def health_check() -> JSONResponse:
return JSONResponse(host.lifetime.status)


if __name__ == "__main__":
from justscheduleit import hosting

hosting.run(host)
exit(hosting.run(host))
File renamed without changes.
7 changes: 3 additions & 4 deletions examples/serve_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
from examples.app import scheduler


async def main():
async def main(duration: int):
from justscheduleit.scheduler import aserve

# Scheduler in the same thread, same event loop
async with aserve(scheduler):
for _ in range(10):
for _ in range(duration):
await anyio.sleep(1)
print("Main thread is running...")
print("Main thread is done, exiting the app")


if __name__ == "__main__":
# noinspection PyTypeChecker
anyio.run(main)
anyio.run(main, 10)
1 change: 0 additions & 1 deletion examples/serve_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def main():
except KeyboardInterrupt:
# Scheduler service is the only one in the host, so after the scheduler is done, the host will stop too
scheduler_host.shutdown()
# scheduler.lifetime.shutdown()

print("Main thread is done, exiting the app")

Expand Down
4 changes: 2 additions & 2 deletions justscheduleit/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def td_str(td: timedelta) -> str:
DelayFactory = Union[Callable[[], timedelta], tuple[int, int], tuple[float, float], int, float, timedelta, None]


@dc.dataclass(frozen=True) # Enable slots when Python 3.10+
@dc.dataclass(frozen=True, slots=True)
class RandomDelay: # https://en.wikipedia.org/wiki/Jitter#Types
bounds: tuple[int, int] | tuple[float, float]

Expand All @@ -92,7 +92,7 @@ def __call__(self) -> timedelta:
return timedelta(seconds=delay)


@dc.dataclass(frozen=True) # Enable slots when Python 3.10+
@dc.dataclass(frozen=True, slots=True)
class FixedDelay:
value: timedelta

Expand Down
6 changes: 3 additions & 3 deletions justscheduleit/cond/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def _take(scheduler_lifetime: SchedulerLifetime):
return _take


@dc.dataclass(frozen=True) # Enable slots when Python 3.10+
@dc.dataclass(frozen=True, slots=True)
class Every:
"""
Triggers every `period`, with an (optional) additional `delay` (jitter).
Expand Down Expand Up @@ -133,7 +133,7 @@ def every(period: timedelta | str, /, *, delay: DelayFactory = DEFAULT_JITTER, s
return Every(ensure_td(period), ensure_delay_factory(delay), stop_on_error)


@dc.dataclass(frozen=True) # Enable slots when Python 3.10+
@dc.dataclass(frozen=True, slots=True)
class Recurrent:
"""
Triggers every `default_period` (unless overwritten), with an (optional) additional `delay` (jitter).
Expand Down Expand Up @@ -188,7 +188,7 @@ def recurrent(
return Recurrent(ensure_td(default_interval), ensure_delay_factory(delay), stop_on_error)


@dc.dataclass(frozen=True) # Enable slots when Python 3.10+
@dc.dataclass(frozen=True, slots=True)
class After(Generic[T]):
"""
Triggers every time `task` is completed, with an (optional) additional `delay` (jitter).
Expand Down
2 changes: 1 addition & 1 deletion justscheduleit/cond/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
__all__ = ["cron"]


@dc.dataclass(frozen=True) # Enable slots when Python 3.10+
@dc.dataclass(frozen=True, slots=True)
class Cron:
"""
Triggers according to the cron schedule, with an (optional) additional `delay` (jitter).
Expand Down
Loading

0 comments on commit 44a033d

Please sign in to comment.