Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ init: ## Install all project dependencies with extras
@$(MAKE) check_venv
@uv sync --all-extras

.PHONY: run_docs
run_docs: ## Run documentation server
@uv run mkdocs serve --livereload

.PHONY: run_infra
run_infra: ## Run rabbitmq in docker for integration tests
@docker compose -f docker-compose.yml up -d
Expand Down
79 changes: 79 additions & 0 deletions docs/tutorial/schedule_source.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,85 @@
title: Schedule Source
---

## Basic usage

The easiest way to schedule task with this library is to add `schedule` label to task. Schedule source will automatically
parse this label and add new schedule to database on start of scheduler.

You can define your scheduled task like this:

```python
import asyncio
from taskiq import TaskiqScheduler
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn)
scheduler = TaskiqScheduler(
broker=broker,
sources=[AsyncpgScheduleSource(
dsn=dsn,
broker=broker,
)],
)


@broker.task(
task_name="solve_all_problems",
schedule=[
{
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
"cron_offset": None, # type: str | None, can be omitted. For example "Europe/Berlin".
"time": None, # type: datetime | None, either cron or time should be specified.
"args": [], # type list[Any] | None, can be omitted.
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
"labels": {}, # type: dict[str, Any] | None, can be omitted.
},
],
)
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print("All problems are solved!")
```


## Adding schedule in runtime

You can also add schedules in runtime using `add_schedule` method of the schedule source:


```python
import asyncio
from taskiq import TaskiqScheduler, ScheduledTask
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn)
schedule_source = AsyncpgScheduleSource(
dsn=dsn,
broker=broker,
)
scheduler = TaskiqScheduler(
broker=broker,
sources=[schedule_source],
)


@broker.task(
task_name="solve_all_problems",
)
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print("All problems are solved!")

# Call this function somewhere in your code to add new schedule
async def add_new_schedule() -> None:
await schedule_source.add_schedule(ScheduledTask(...))
```

## Using multiple schedules

Expand Down
8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ lint = [
"asyncpg-stubs>=0.30.2",
]
test = [
"polyfactory>=2.22.2",
"pytest>=8.4.2",
"pytest-asyncio>=1.1.0",
"pytest-cov>=7.0.0",
Expand Down Expand Up @@ -125,10 +126,8 @@ select = ["ALL"]
ignore = [
# TODO: enable this rules
"TRY301",
"PLR0913",
"D401",
"ANN401",
"PERF203",
# "PERF203",


# boolean args
Expand Down Expand Up @@ -174,6 +173,9 @@ ignore = [
"INP001",
]

[tool.ruff.lint.pydocstyle]
convention = "google"

[tool.ruff.lint.isort]
known-local-folder = ["taskiq_pg"]
lines-after-imports = 2
Expand Down
2 changes: 1 addition & 1 deletion src/taskiq_pg/_internal/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class BasePostgresBroker(AsyncBroker, abc.ABC):
"""Base class for Postgres brokers."""

def __init__(
def __init__( # noqa: PLR0913
self,
dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
result_backend: AsyncResultBackend[_T] | None = None,
Expand Down
48 changes: 48 additions & 0 deletions src/taskiq_pg/_internal/schedule_source.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
from __future__ import annotations

import typing as tp
import uuid
from logging import getLogger

from pydantic import ValidationError
from taskiq import ScheduleSource
from taskiq.scheduler.scheduled_task import ScheduledTask


if tp.TYPE_CHECKING:
from taskiq.abc.broker import AsyncBroker


logger = getLogger("taskiq_pg")


class BasePostgresScheduleSource(ScheduleSource):
def __init__(
self,
Expand Down Expand Up @@ -47,3 +54,44 @@ def dsn(self) -> str | None:
if callable(self._dsn):
return self._dsn()
return self._dsn

def extract_scheduled_tasks_from_broker(self) -> list[ScheduledTask]:
"""
Extract schedules from tasks that were registered in broker.

Returns:
A list of ScheduledTask instances extracted from the task's labels.
"""
scheduled_tasks_for_creation: list[ScheduledTask] = []
for task_name, task in self._broker.get_all_tasks().items():
if "schedule" not in task.labels:
logger.debug("Task %s has no schedule, skipping", task_name)
continue
if not isinstance(task.labels["schedule"], list):
logger.warning(
"Schedule for task %s is not a list, skipping",
task_name,
)
continue
for schedule in task.labels["schedule"]:
try:
new_schedule = ScheduledTask.model_validate(
{
"task_name": task_name,
"labels": schedule.get("labels", {}),
"args": schedule.get("args", []),
"kwargs": schedule.get("kwargs", {}),
"schedule_id": str(uuid.uuid4()),
"cron": schedule.get("cron", None),
"cron_offset": schedule.get("cron_offset", None),
"time": schedule.get("time", None),
},
)
scheduled_tasks_for_creation.append(new_schedule)
except ValidationError: # noqa: PERF203
logger.exception(
"Schedule for task %s is not valid, skipping",
task_name,
)
continue
return scheduled_tasks_for_creation
4 changes: 4 additions & 0 deletions src/taskiq_pg/aiopg/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@
DELETE_ALL_SCHEDULES_QUERY = """
DELETE FROM {};
"""

DELETE_SCHEDULE_QUERY = """
DELETE FROM {} WHERE id = %s;
"""
80 changes: 41 additions & 39 deletions src/taskiq_pg/aiopg/schedule_source.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import uuid
from logging import getLogger

from aiopg import Pool, create_pool
from pydantic import ValidationError
from taskiq import ScheduledTask

from taskiq_pg import exceptions
from taskiq_pg._internal import BasePostgresScheduleSource
from taskiq_pg.aiopg.queries import (
CREATE_SCHEDULES_TABLE_QUERY,
DELETE_ALL_SCHEDULES_QUERY,
DELETE_SCHEDULE_QUERY,
INSERT_SCHEDULE_QUERY,
SELECT_SCHEDULES_QUERY,
)
Expand Down Expand Up @@ -39,42 +38,6 @@ async def _update_schedules_on_startup(self, schedules: list[ScheduledTask]) ->
],
)

def _get_schedules_from_broker_tasks(self) -> list[ScheduledTask]:
"""Extract schedules from the broker's registered tasks."""
scheduled_tasks_for_creation: list[ScheduledTask] = []
for task_name, task in self._broker.get_all_tasks().items():
if "schedule" not in task.labels:
logger.debug("Task %s has no schedule, skipping", task_name)
continue
if not isinstance(task.labels["schedule"], list):
logger.warning(
"Schedule for task %s is not a list, skipping",
task_name,
)
continue
for schedule in task.labels["schedule"]:
try:
new_schedule = ScheduledTask.model_validate(
{
"task_name": task_name,
"labels": schedule.get("labels", {}),
"args": schedule.get("args", []),
"kwargs": schedule.get("kwargs", {}),
"schedule_id": str(uuid.uuid4()),
"cron": schedule.get("cron", None),
"cron_offset": schedule.get("cron_offset", None),
"time": schedule.get("time", None),
},
)
scheduled_tasks_for_creation.append(new_schedule)
except ValidationError:
logger.exception(
"Schedule for task %s is not valid, skipping",
task_name,
)
continue
return scheduled_tasks_for_creation

async def startup(self) -> None:
"""
Initialize the schedule source.
Expand All @@ -89,7 +52,7 @@ async def startup(self) -> None:
)
async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
await cursor.execute(CREATE_SCHEDULES_TABLE_QUERY.format(self._table_name))
scheduled_tasks_for_creation = self._get_schedules_from_broker_tasks()
scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker()
await self._update_schedules_on_startup(scheduled_tasks_for_creation)
except Exception as error:
raise exceptions.DatabaseConnectionError(str(error)) from error
Expand Down Expand Up @@ -122,3 +85,42 @@ async def get_schedules(self) -> list["ScheduledTask"]:
),
)
return schedules

async def add_schedule(self, schedule: "ScheduledTask") -> None:
"""
Add a new schedule.

Args:
schedule: schedule to add.
"""
async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
await cursor.execute(
INSERT_SCHEDULE_QUERY.format(self._table_name),
[
schedule.schedule_id,
schedule.task_name,
schedule.model_dump_json(
exclude={"schedule_id", "task_name"},
),
],
)

async def delete_schedule(self, schedule_id: str) -> None:
"""
Method to delete schedule by id.

This is useful for schedule cancelation.

Args:
schedule_id: id of schedule to delete.
"""
async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
await cursor.execute(
DELETE_SCHEDULE_QUERY.format(self._table_name),
[schedule_id],
)

async def post_send(self, task: ScheduledTask) -> None:
"""Delete a task after it's completed."""
if task.time is not None:
await self.delete_schedule(task.schedule_id)
4 changes: 4 additions & 0 deletions src/taskiq_pg/asyncpg/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@
DELETE_ALL_SCHEDULES_QUERY = """
DELETE FROM {};
"""

DELETE_SCHEDULE_QUERY = """
DELETE FROM {} WHERE id = $1;
"""
Loading
Loading