Skip to content

Commit 3c4ccb2

Browse files
committed
feat: multiqueue implementation
1 parent ed4a0f9 commit 3c4ccb2

20 files changed

+1302
-542
lines changed

Makefile

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
.DEFAULT:
2+
@echo "No such command (or you pass two or many targets to ). List of possible commands: make help"
3+
4+
.DEFAULT_GOAL := help
5+
6+
##@ Local development
7+
8+
.PHONY: help
9+
help: ## Show this help
10+
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m<target> <arg=value>\033[0m\n"} /^[a-zA-Z_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m %s\033[0m\n\n", substr($$0, 5) } ' $(MAKEFILE_LIST)
11+
12+
.PHONY: clear_rabbit
13+
clear_rabbit: ## Clear RabbitMQ data volume and restart container
14+
@docker stop taskiq_aio_pika_rabbitmq && docker rm taskiq_aio_pika_rabbitmq && docker volume rm taskiq-aio-pika_rabbitmq_data && docker compose up -d

README.md

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ This library provides you with aio-pika broker for taskiq.
99
Features:
1010
- Supports delayed messages using dead-letter queues or RabbitMQ delayed message exchange plugin.
1111
- Supports message priorities.
12+
- Supports multiple queues and custom routing.
1213

1314
Usage example:
1415

1516
```python
1617
from taskiq_aio_pika import AioPikaBroker
1718

18-
broker = AioPikaBroker()
19+
broker = AioPikaBroker(...)
1920

2021
@broker.task
2122
async def test() -> None:
@@ -32,7 +33,7 @@ To send delayed message, you have to specify delay label. You can do it with `ta
3233
In this type of delay we are using additional queue with `expiration` parameter. After declared time message will be deleted from `delay` queue and sent to the main queue. For example:
3334

3435
```python
35-
broker = AioPikaBroker()
36+
broker = AioPikaBroker(...)
3637

3738
@broker.task(delay=3)
3839
async def delayed_task() -> int:
@@ -86,13 +87,12 @@ async def main():
8687
## Priorities
8788

8889
You can define priorities for messages using `priority` label. Messages with higher priorities are delivered faster.
89-
But to use priorities you need to define `max_priority` of the main queue, by passing `max_priority` parameter in broker's init. This parameter sets maximum priority for the queue and declares it as the priority queue.
9090

9191
Before doing so please read the [documentation](https://www.rabbitmq.com/priority.html#behaviour) about what
9292
downsides you get by using prioritized queues.
9393

9494
```python
95-
broker = AioPikaBroker(max_priority=10)
95+
broker = AioPikaBroker(...)
9696

9797
# We can define default priority for tasks.
9898
@broker.task(priority=2)
@@ -111,42 +111,43 @@ async def main():
111111
await prio_task.kicker().with_labels(priority=None).kiq()
112112
```
113113

114-
## Configuration
114+
## Custom Queue and Exchange arguments
115115

116-
AioPikaBroker parameters:
117-
118-
* `url` - url to rabbitmq. If None, "amqp://guest:guest@localhost:5672" is used.
119-
* `result_backend` - custom result backend.
120-
* `task_id_generator` - custom task_id genertaor.
121-
* `exchange_name` - name of exchange that used to send messages.
122-
* `exchange_type` - type of the exchange. Used only if `declare_exchange` is True.
123-
* `queue_name` - queue that used to get incoming messages.
124-
* `routing_key` - that used to bind that queue to the exchange.
125-
* `declare_exchange` - whether you want to declare new exchange if it doesn't exist.
126-
* `max_priority` - maximum priority for messages.
127-
* `delay_queue_name` - custom delay queue name. This queue is used to deliver messages with delays.
128-
* `dead_letter_queue_name` - custom dead letter queue name.
129-
This queue is used to receive negatively acknowledged messages from the main queue.
130-
* `qos` - number of messages that worker can prefetch.
131-
* `declare_queues` - whether you want to declare queues even on client side. May be useful for message persistence.
132-
* `declare_queues_kwargs` - see [Custom Queue Arguments](#custom-queue-arguments) for more details.
133-
134-
## Custom Queue Arguments
135-
136-
You can pass custom arguments to the underlying RabbitMQ queue declaration by using the `declare_queues_kwargs` parameter of `AioPikaBroker`. If you want to set specific queue arguments (such as RabbitMQ extensions or custom behaviors), provide them in the `arguments` dictionary inside `declare_queues_kwargs`.
116+
You can pass custom arguments to the underlying RabbitMQ queues and exchange declaration by using the `Queue`/`Exchange` classes from `taskiq_aio_pika`. If you used `faststream` before you are probably familiar with this concept.
137117

138118
These arguments will be merged with the default arguments used by the broker
139119
(such as dead-lettering and priority settings). If there are any conflicts, the values you provide will take precedence over the broker's defaults. Example:
140120

141121
```python
122+
from taskiq_aio_pika import AioPikaBroker, Queue, QueueType, Exchange
123+
from aio_pika.abc import ExchangeType
124+
142125
broker = AioPikaBroker(
143-
declare_queues_kwargs={
144-
"arguments": {
145-
"x-message-ttl": 60000, # Set message TTL to 60 seconds
146-
"x-queue-type": "quorum", # Use quorum queue type
147-
}
148-
}
126+
exchange=Exchange(
127+
name="custom_exchange",
128+
type=ExchangeType.TOPIC,
129+
declare=True,
130+
durable=True,
131+
auto_delete=False,
132+
)
133+
task_queues=[
134+
Queue(
135+
name="custom_queue",
136+
type=QueueType.CLASSIC,
137+
declare=True,
138+
durable=True,
139+
max_priority=10,
140+
routing_key="custom_queue",
141+
)
142+
]
149143
)
150144
```
151145

152146
This will ensure that the queue is created with your custom arguments, in addition to the broker's defaults.
147+
148+
149+
## Multiqueue support
150+
151+
You can define multiple queues for your tasks. Each queue can have its own routing key and other settings. And your workers can listen to multiple queues (or specific queue) as well.
152+
153+
You can check [multiqueue usage example](./examples/topic_with_two_queues.py) in examples folder for more details.

docker-compose.yaml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ services:
22
rabbitmq:
33
container_name: taskiq_aio_pika_rabbitmq
44
image: heidiks/rabbitmq-delayed-message-exchange:latest
5+
# image: rabbitmq:3.13.7-management # rabbit with management UI for debugging
56
environment:
67
RABBITMQ_DEFAULT_USER: "guest"
78
RABBITMQ_DEFAULT_PASS: "guest"
@@ -14,4 +15,13 @@ services:
1415
ports:
1516
- "5672:5672"
1617
- "15672:15672"
17-
- "61613:61613"
18+
volumes:
19+
- rabbitmq_data:/var/lib/rabbitmq
20+
redis:
21+
container_name: taskiq_aio_pika_redis
22+
image: redis:latest
23+
ports:
24+
- "6379:6379"
25+
26+
volumes:
27+
rabbitmq_data:

examples/basic.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""
2+
Basic example of using Taskiq with AioPika broker.
3+
4+
How to run:
5+
1. Run worker: taskiq worker examples.basic:broker -w 1
6+
2. Run broker: uv run examples/basic.py
7+
"""
8+
9+
import asyncio
10+
11+
from taskiq_redis import RedisAsyncResultBackend
12+
13+
from taskiq_aio_pika import AioPikaBroker
14+
15+
broker = AioPikaBroker(
16+
"amqp://guest:guest@localhost:5672/",
17+
).with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))
18+
19+
20+
@broker.task
21+
async def add_one(value: int) -> int:
22+
return value + 1
23+
24+
25+
async def main() -> None:
26+
await broker.startup()
27+
# Send the task to the broker.
28+
task = await add_one.kiq(1)
29+
# Wait for the result.
30+
result = await task.wait_result(timeout=2)
31+
print(f"Task execution took: {result.execution_time} seconds.")
32+
if not result.is_err:
33+
print(f"Returned value: {result.return_value}")
34+
else:
35+
print("Error found while executing task.")
36+
await broker.shutdown()
37+
38+
39+
if __name__ == "__main__":
40+
asyncio.run(main())

examples/delayed_task.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""
2+
Example of delayed task execution using Taskiq with AioPika broker.
3+
4+
How to run:
5+
1. Run worker: taskiq worker examples.delayed_task:broker -w 1
6+
2. Run broker: uv run examples/delayed_task.py
7+
"""
8+
9+
import asyncio
10+
11+
from taskiq_redis import RedisAsyncResultBackend
12+
13+
from taskiq_aio_pika import AioPikaBroker
14+
15+
broker = AioPikaBroker(
16+
"amqp://guest:guest@localhost:5672/",
17+
).with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))
18+
19+
20+
@broker.task
21+
async def add_one(value: int) -> int:
22+
return value + 1
23+
24+
25+
async def main() -> None:
26+
await broker.startup()
27+
# Send the task to the broker.
28+
task = await add_one.kicker().with_labels(delay=2).kiq(1)
29+
print("Task sent with 2 seconds delay.")
30+
# Wait for the result.
31+
result = await task.wait_result(timeout=3)
32+
print(f"Task execution took: {result.execution_time} seconds.")
33+
if not result.is_err:
34+
print(f"Returned value: {result.return_value}")
35+
else:
36+
print("Error found while executing task.")
37+
await broker.shutdown()
38+
39+
40+
if __name__ == "__main__":
41+
asyncio.run(main())

examples/topic_with_two_queues.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""
2+
Example with two queues for different workers and one topic exchange.
3+
4+
It can be useful when you want to have two worker
5+
6+
How to run:
7+
1. Run worker for queue_1: taskiq worker examples.topic_with_two_queues:get_broker_for_queue_1 -w 1
8+
2. Run worker for queue_2: taskiq worker examples.topic_with_two_queues:get_broker_for_queue_2 -w 1
9+
3. Run broker to send a task: uv run examples/topic_with_two_queues.py --queue 1
10+
4. Optionally run broker to send a task to other queue: uv run examples/topic_with_two_queues.py --queue 2
11+
"""
12+
13+
import argparse
14+
import asyncio
15+
import uuid
16+
17+
from aio_pika.abc import ExchangeType
18+
from taskiq_redis import RedisAsyncResultBackend
19+
20+
from taskiq_aio_pika import AioPikaBroker, Exchange, Queue, QueueType
21+
22+
broker = AioPikaBroker(
23+
"amqp://guest:guest@localhost:5672/",
24+
exchange=Exchange(
25+
name="topic_exchange",
26+
type=ExchangeType.TOPIC,
27+
),
28+
delay_queue=Queue(
29+
name="taskiq.delay",
30+
routing_key="queue1",
31+
), # send delayed messages to queue1
32+
).with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))
33+
34+
35+
@broker.task
36+
async def add_one(value: int) -> int:
37+
return value + 1
38+
39+
40+
queue_1 = Queue(
41+
name="queue1",
42+
type=QueueType.CLASSIC,
43+
durable=False,
44+
)
45+
queue_2 = Queue(
46+
name="queue2",
47+
type=QueueType.CLASSIC,
48+
durable=False,
49+
)
50+
51+
52+
def get_broker_for_queue_1() -> AioPikaBroker:
53+
print("This broker will listen to queue1")
54+
return broker.with_queue(queue_1)
55+
56+
57+
def get_broker_for_queue_2() -> AioPikaBroker:
58+
print("This broker will listen to queue2")
59+
return broker.with_queue(queue_2)
60+
61+
62+
async def main() -> None:
63+
parser = argparse.ArgumentParser()
64+
parser.add_argument(
65+
"--queue",
66+
choices=["1", "2"],
67+
required=True,
68+
help="Queue to send the task to.",
69+
)
70+
args = parser.parse_args()
71+
72+
queue_name = queue_1.name if args.queue == "1" else queue_2.name
73+
74+
broker.with_queues(
75+
queue_1,
76+
queue_2,
77+
) # declare both queues to know about them during publishing
78+
await broker.startup()
79+
80+
task = (
81+
await add_one.kicker()
82+
.with_labels(queue_name=queue_name) # or it can be routing_key from queue_1
83+
.with_task_id(uuid.uuid4().hex)
84+
.kiq(2)
85+
)
86+
result = await task.wait_result(timeout=2)
87+
print(f"Task execution took: {result.execution_time} seconds.")
88+
if not result.is_err:
89+
print(f"Returned value: {result.return_value}")
90+
else:
91+
print("Error found while executing task.")
92+
await broker.shutdown()
93+
94+
95+
if __name__ == "__main__":
96+
asyncio.run(main())

pyproject.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ repository = "https://github.com/taskiq-python/taskiq-aio-pika"
3030
keywords = ["taskiq", "tasks", "distributed", "async", "aio-pika"]
3131
requires-python = ">=3.10,<4"
3232
dependencies = [
33-
"taskiq>=0.11.20,<1",
33+
"taskiq>=0.12.0,<1",
3434
"aio-pika>=9.0.0",
35+
"aiostream>=0.7.1",
3536
]
3637

3738
[dependency-groups]
@@ -48,6 +49,10 @@ dev = [
4849
"coverage>=7.11.3",
4950
"pytest-xdist[psutil]>=3.8.0",
5051
"anyio>=4.11.0",
52+
{include-group = "examples"},
53+
]
54+
examples = [
55+
"taskiq-redis>=1.1.2",
5156
]
5257

5358
[tool.mypy]
@@ -130,6 +135,12 @@ line-length = 88
130135
"SLF001", # Private member accessed
131136
"S311", # Standard pseudo-random generators are not suitable for security/cryptographic purposes
132137
"D101", # Missing docstring in public class
138+
"D102", # Missing docstring in public method
139+
"E501", # Line too long
140+
]
141+
"examples/*" = [
142+
"D", # missing docstrings
143+
"T201", # print found
133144
]
134145

135146
[tool.ruff.lint.pydocstyle]

taskiq_aio_pika/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@
33
from importlib.metadata import version
44

55
from taskiq_aio_pika.broker import AioPikaBroker
6+
from taskiq_aio_pika.exchange import Exchange
7+
from taskiq_aio_pika.queue import Queue, QueueType
68

79
__version__ = version("taskiq-aio-pika")
810

9-
__all__ = ["AioPikaBroker"]
11+
__all__ = [
12+
"AioPikaBroker",
13+
"Exchange",
14+
"Queue",
15+
"QueueType",
16+
]

0 commit comments

Comments
 (0)