You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This library provides you with aio-pika broker for taskiq.
4
8
5
-
Usage:
9
+
Features:
10
+
- Supports delayed messages using dead-letter queues or RabbitMQ delayed message exchange plugin.
11
+
- Supports message priorities.
12
+
13
+
Usage example:
14
+
6
15
```python
7
16
from taskiq_aio_pika import AioPikaBroker
8
17
@@ -14,19 +23,13 @@ async def test() -> None:
14
23
15
24
```
16
25
17
-
## Non-obvious things
18
-
19
-
You can send delayed messages and set priorities to messages using labels.
20
-
21
26
## Delays
22
27
23
-
### **Default retries**
28
+
### Default delays
24
29
25
-
To send delayed message, you have to specify
26
-
delay label. You can do it with `task` decorator,
27
-
or by using kicker.
28
-
In this type of delay we are using additional queue with `expiration` parameter and after with time message will be deleted from `delay` queue and sent to the main taskiq queue.
29
-
For example:
30
+
To send delayed message, you have to specify delay label. You can do it with `task` decorator, or by using kicker.
31
+
32
+
In this type of delay we are using additional queue with `expiration` parameter. After declared time message will be deleted from `delay` queue and sent to the main queue. For example:
30
33
31
34
```python
32
35
broker = AioPikaBroker()
@@ -42,7 +45,7 @@ async def main():
42
45
await delayed_task.kiq()
43
46
44
47
# This message is going to be received after the delay in 4 seconds.
45
-
# Since we overriden the `delay` label using kicker.
48
+
# Since we overridden the `delay` label using kicker.
First of all please make sure that your RabbitMQ server has [rabbitmq-delayed-message-exchange plugin](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange) installed.
59
61
60
-
And you need to configure you broker.
61
-
There is `delayed_message_exchange_plugin``AioPikaBroker` parameter and it must be `True` to turn on delayed message functionality.
62
+
Also you need to configure you broker by passing `delayed_message_exchange_plugin=True` to broker.
62
63
63
-
The delay plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time.
64
-
For example:
64
+
This plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time. For example:
65
65
66
66
```python
67
67
broker = AioPikaBroker(
@@ -79,22 +79,18 @@ async def main():
79
79
await delayed_task.kiq()
80
80
81
81
# This message is going to be received after the delay in 4 seconds.
82
-
# Since we overriden the `delay` label using kicker.
82
+
# Since we overridden the `delay` label using kicker.
You can define priorities for messages using `priority` label.
89
-
Messages with higher priorities are delivered faster.
90
-
But to use priorities you need to define `max_priority` of the main queue, by passing `max_priority` parameter in broker's init.
91
-
This parameter sets maximum priority for the queue and
92
-
declares it as the prority queue.
88
+
You can define priorities for messages using `priority` label. Messages with higher priorities are delivered faster.
89
+
But to use priorities you need to define `max_priority` of the main queue, by passing `max_priority` parameter in broker's init. This parameter sets maximum priority for the queue and declares it as the priority queue.
93
90
94
91
Before doing so please read the [documentation](https://www.rabbitmq.com/priority.html#behaviour) about what
*`url` - url to rabbitmq. If None, "amqp://guest:guest@localhost:5672" is used.
123
119
*`result_backend` - custom result backend.
124
120
*`task_id_generator` - custom task_id genertaor.
@@ -128,22 +124,19 @@ AioPikaBroker parameters:
128
124
*`routing_key` - that used to bind that queue to the exchange.
129
125
*`declare_exchange` - whether you want to declare new exchange if it doesn't exist.
130
126
*`max_priority` - maximum priority for messages.
131
-
*`delay_queue_name` - custom delay queue name.
132
-
This queue is used to deliver messages with delays.
127
+
*`delay_queue_name` - custom delay queue name. This queue is used to deliver messages with delays.
133
128
*`dead_letter_queue_name` - custom dead letter queue name.
134
-
This queue is used to receive negatively acknowleged messages from the main queue.
129
+
This queue is used to receive negatively acknowledged messages from the main queue.
135
130
*`qos` - number of messages that worker can prefetch.
136
-
*`declare_queues` - whether you want to declare queues even on
137
-
client side. May be useful for message persistance.
131
+
*`declare_queues` - whether you want to declare queues even on client side. May be useful for message persistence.
138
132
*`declare_queues_kwargs` - see [Custom Queue Arguments](#custom-queue-arguments) for more details.
139
133
140
134
## Custom Queue Arguments
141
135
142
136
You can pass custom arguments to the underlying RabbitMQ queue declaration by using the `declare_queues_kwargs` parameter of `AioPikaBroker`. If you want to set specific queue arguments (such as RabbitMQ extensions or custom behaviors), provide them in the `arguments` dictionary inside `declare_queues_kwargs`.
143
137
144
-
These arguments will be merged with the default arguments used by the broker (such as dead-lettering and priority settings).
145
-
146
-
**Example:**
138
+
These arguments will be merged with the default arguments used by the broker
139
+
(such as dead-lettering and priority settings). If there are any conflicts, the values you provide will take precedence over the broker's defaults. Example:
0 commit comments