Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@
<hr/>
</div>

PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers.
PostgreSQL integration for Taskiq with support for asyncpg, psqlpy, psycopg and aiopg drivers.

See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/) or [examples directory](https://github.com/danfimov/taskiq-postgres/examples).
## Features

- **PostgreSQL Broker** - high-performance message broker using PostgreSQL LISTEN/NOTIFY;
- **Result Backend** - persistent task result storage with configurable retention;
- **Scheduler Source** - cron-like task scheduling with PostgreSQL persistence;
- **Multiple Drivers** - support for asyncpg, psycopg3, psqlpy and aiopg;
- **Flexible Configuration** - customizable table names, field types, and connection options;
- **Multiple Serializers** - support for different serialization methods (Pickle, JSON, etc.).

See usage guide in [documentation](https://danfimov.github.io/taskiq-postgres/) or explore examples in [separate directory](https://github.com/danfimov/taskiq-postgres/examples).

## Installation

Expand All @@ -22,6 +31,9 @@ pip install taskiq-postgres[asyncpg]
# with psqlpy
pip install taskiq-postgres[psqlpy]

# with psycopg3
pip install taskiq-postgres[psycopg]

# with aiopg
pip install taskiq-postgres[aiopg]
```
Expand Down Expand Up @@ -101,7 +113,7 @@ Your experience with other drivers will be pretty similar. Just change the impor
schedule=[
{
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
"cron_offset": None, # type: str | timedelta | None, can be omitted.
"cron_offset": None, # type: str | None, can be omitted. For example "Europe/Berlin".
"time": None, # type: datetime | None, either cron or time should be specified.
"args": [], # type list[Any] | None, can be omitted.
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
Expand Down
85 changes: 80 additions & 5 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ title: Overview
<hr/>
</div>

PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers.
PostgreSQL integration for Taskiq with support for asyncpg, psqlpy, psycopg and aiopg drivers.

## Motivation

Expand All @@ -22,7 +22,6 @@ To address this issue I created this library with a common interface for most po
- brokers;
- schedule sources.


## Installation

Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra:
Expand All @@ -39,6 +38,12 @@ Depending on your preferred PostgreSQL driver, you can install this library with
pip install taskiq-postgres[psqlpy]
```

=== "psycopg"

```bash
pip install taskiq-postgres[psycopg]
```

=== "aiopg"

```bash
Expand Down Expand Up @@ -93,6 +98,36 @@ Depending on your preferred PostgreSQL driver, you can install this library with
broker = PSQLPyBroker(dsn).with_result_backend(PSQLPyResultBackend(dsn))


@broker.task("solve_all_problems")
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print("All problems are solved!")


async def main():
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
```

=== "psycopg"

```python
# broker_example.py
import asyncio
from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = PsycopgBroker(dsn).with_result_backend(PsycopgResultBackend(dsn))


@broker.task("solve_all_problems")
async def best_task_ever() -> None:
"""Solve all problems in the world."""
Expand Down Expand Up @@ -184,7 +219,7 @@ Your experience with other drivers will be pretty similar. Just change the impor
schedule=[
{
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
"cron_offset": None, # type: str | timedelta | None, can be omitted.
"cron_offset": None, # type: str | None, can be omitted.
"time": None, # type: datetime | None, either cron or time should be specified.
"args": [], # type list[Any] | None, can be omitted.
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
Expand Down Expand Up @@ -223,7 +258,47 @@ Your experience with other drivers will be pretty similar. Just change the impor
schedule=[
{
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
"cron_offset": None, # type: str | timedelta | None, can be omitted.
"cron_offset": None, # type: str | None, can be omitted.
"time": None, # type: datetime | None, either cron or time should be specified.
"args": [], # type list[Any] | None, can be omitted.
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
"labels": {}, # type: dict[str, Any] | None, can be omitted.
},
],
)
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print("All problems are solved!")

```

=== "psycopg"

```python
# scheduler_example.py
import asyncio
from taskiq import TaskiqScheduler
from taskiq_pg.psycopg import PsycopgBroker, PsycopgScheduleSource


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = PsycopgBroker(dsn)
scheduler = TaskiqScheduler(
broker=broker,
sources=[PsycopgScheduleSource(
dsn=dsn,
broker=broker,
)],
)


@broker.task(
task_name="solve_all_problems",
schedule=[
{
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
"cron_offset": None, # type: str | None, can be omitted.
"time": None, # type: datetime | None, either cron or time should be specified.
"args": [], # type list[Any] | None, can be omitted.
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
Expand Down Expand Up @@ -263,7 +338,7 @@ Your experience with other drivers will be pretty similar. Just change the impor
schedule=[
{
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
"cron_offset": None, # type: str | timedelta | None, can be omitted.
"cron_offset": None, # type: str | None, can be omitted.
"time": None, # type: datetime | None, either cron or time should be specified.
"args": [], # type list[Any] | None, can be omitted.
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
Expand Down
6 changes: 3 additions & 3 deletions examples/example_with_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
How to run:

1) Run worker in one terminal:
uv run taskiq worker examples.example_with_broker:broker
uv run taskiq worker examples.example_with_broker:broker --workers 1

2) Run this script in another terminal:
uv run python -m examples.example_with_broker
"""

import asyncio

from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))
broker = PsycopgBroker(dsn).with_result_backend(PsycopgResultBackend(dsn))


@broker.task("solve_all_problems")
Expand Down
8 changes: 4 additions & 4 deletions examples/example_with_schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
How to run:

1) Run worker in one terminal:
uv run taskiq worker examples.example_with_schedule_source:broker
uv run taskiq worker examples.example_with_schedule_source:broker --workers 1

2) Run scheduler in another terminal:
uv run taskiq scheduler examples.example_with_schedule_source:scheduler
Expand All @@ -12,15 +12,15 @@

from taskiq import TaskiqScheduler

from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource
from taskiq_pg.psycopg import PsycopgBroker, PsycopgScheduleSource


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn)
broker = PsycopgBroker(dsn)
scheduler = TaskiqScheduler(
broker=broker,
sources=[
AsyncpgScheduleSource(
PsycopgScheduleSource(
dsn=dsn,
broker=broker,
),
Expand Down
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ asyncpg = [
psqlpy = [
"psqlpy>=0.11.6",
]
psycopg = [
"psycopg[binary,pool]>=3.2.10",
]

[dependency-groups]
dev = [
Expand All @@ -60,6 +63,8 @@ dev = [
"pytest>=8.4.2",
"pytest-asyncio>=1.1.0",
"pytest-cov>=7.0.0",
# for database in tests
"sqlalchemy-utils>=0.42.0",
# pre-commit hooks
"prek>=0.2.4",
# docs
Expand Down Expand Up @@ -149,6 +154,7 @@ ignore = [
"RUF",

"PLR2004", # magic numbers in tests
"ANN", # missing type annotations in tests
]
"tests/test_linting.py" = [
"S603", # subprocess usage
Expand Down
43 changes: 22 additions & 21 deletions src/taskiq_pg/asyncpg/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,34 @@
class AsyncpgBroker(BasePostgresBroker):
"""Broker that uses asyncpg as driver and PostgreSQL with LISTEN/NOTIFY mechanism."""

read_conn: asyncpg.Connection[asyncpg.Record] | None = None
write_pool: asyncpg.pool.Pool[asyncpg.Record] | None = None
_read_conn: asyncpg.Connection[asyncpg.Record] | None = None
_write_pool: asyncpg.pool.Pool[asyncpg.Record] | None = None

async def startup(self) -> None:
"""Initialize the broker."""
await super().startup()

self.read_conn = await asyncpg.connect(self.dsn, **self.read_kwargs)
self.write_pool = await asyncpg.create_pool(self.dsn, **self.write_kwargs)
self._read_conn = await asyncpg.connect(self.dsn, **self.read_kwargs)
self._write_pool = await asyncpg.create_pool(self.dsn, **self.write_kwargs)

if self.read_conn is None:
msg = "read_conn not initialized"
if self._read_conn is None:
msg = "_read_conn not initialized"
raise RuntimeError(msg)

async with self.write_pool.acquire() as conn:
_ = await conn.execute(CREATE_MESSAGE_TABLE_QUERY.format(self.table_name))
async with self._write_pool.acquire() as conn:
await conn.execute(CREATE_MESSAGE_TABLE_QUERY.format(self.table_name))

await self.read_conn.add_listener(self.channel_name, self._notification_handler)
await self._read_conn.add_listener(self.channel_name, self._notification_handler)
self._queue = asyncio.Queue()

async def shutdown(self) -> None:
"""Close all connections on shutdown."""
await super().shutdown()
if self.read_conn is not None:
await self.read_conn.close()
if self.write_pool is not None:
await self.write_pool.close()
if self._read_conn is not None:
await self._read_conn.remove_listener(self.channel_name, self._notification_handler)
await self._read_conn.close()
if self._write_pool is not None:
await self._write_pool.close()

def _notification_handler(
self,
Expand Down Expand Up @@ -85,11 +86,11 @@ async def kick(self, message: BrokerMessage) -> None:

:param message: Message to send.
"""
if self.write_pool is None:
if self._write_pool is None:
msg = "Please run startup before kicking."
raise ValueError(msg)

async with self.write_pool.acquire() as conn:
async with self._write_pool.acquire() as conn:
# Insert the message into the database
message_inserted_id = tp.cast(
"int",
Expand Down Expand Up @@ -117,9 +118,9 @@ async def kick(self, message: BrokerMessage) -> None:
async def _schedule_notification(self, message_id: int, delay_seconds: int) -> None:
"""Schedule a notification to be sent after a delay."""
await asyncio.sleep(delay_seconds)
if self.write_pool is None:
if self._write_pool is None:
return
async with self.write_pool.acquire() as conn:
async with self._write_pool.acquire() as conn:
# Send NOTIFY
_ = await conn.execute(f"NOTIFY {self.channel_name}, '{message_id}'")

Expand All @@ -131,7 +132,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:

:yields: AckableMessage instances.
"""
if self.write_pool is None:
if self._write_pool is None:
msg = "Call startup before starting listening."
raise ValueError(msg)
if self._queue is None:
Expand All @@ -142,7 +143,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
try:
payload = await self._queue.get()
message_id = int(payload)
async with self.write_pool.acquire() as conn:
async with self._write_pool.acquire() as conn:
claimed = await conn.fetchrow(
CLAIM_MESSAGE_QUERY.format(self.table_name),
message_id,
Expand All @@ -156,11 +157,11 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
message_data = message_str.encode()

async def ack(*, _message_id: int = message_id) -> None:
if self.write_pool is None:
if self._write_pool is None:
msg = "Call startup before starting listening."
raise ValueError(msg)

async with self.write_pool.acquire() as conn:
async with self._write_pool.acquire() as conn:
_ = await conn.execute(
DELETE_MESSAGE_QUERY.format(self.table_name),
_message_id,
Expand Down
Loading