Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer RabbitMQ publisher confirms tutorial to aio-pika. #550

Merged
merged 7 commits into from
Jun 7, 2023
Prev Previous commit
Next Next commit
Create task to send batched messages and handle confirmations with he…
…lper.
  • Loading branch information
MaPePeR committed Jun 7, 2023
commit 01b2b37eeda6f40170fac2689334e9f5840411fa
13 changes: 13 additions & 0 deletions docs/source/rabbitmq-tutorial/7-publisher-confirms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ One drawback is that we do not know exactly what went wrong in case of failure,
to log something meaningful or to re-publish the messages.
And this solution is still synchronous, so it blocks the publishing of messages.

.. note::

To initiate message sending asynchronously, a task is created with :code:`asyncio.create_task`, so the execution of our function
is handled by the event-loop.
The :code:`await asyncio.sleep(0)` is required to make the event loop switch to our coroutine.
Any :code:`await` would have sufficed, though.
Using :code:`async for` with an :code:`async` generator also requires the generator to yield control flow with :code:`await` for message
sending to be initiated.

Without the task and the :code:`await` the message sending would only be initiated with the :code:`asyncio.gather` call.
For some applications this behaivior might be acceptable.


Strategy #3: Handling Publisher Confirms Asynchronously
+++++++++++++++++++++++++++++++++++++++++++++++++++++++

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ async def main() -> None:
# Sending the messages
for msg in get_messages_to_publish():
outstanding_messages.append(
channel.default_exchange.publish(
Message(msg),
routing_key=queue.name,
timeout=5.0,
asyncio.create_task(
channel.default_exchange.publish(
Message(msg),
routing_key=queue.name,
timeout=5.0,
)
)
)
# Yield control flow to event loop, so message sending is initiated:
await asyncio.sleep(0)

if len(outstanding_messages) == batchsize:
await asyncio.gather(*outstanding_messages)
outstanding_messages.clear()
Expand Down