-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transfer RabbitMQ publisher confirms tutorial to aio-pika. #550
Conversation
I just noticed, that it would have probably been better to use the |
@MaPePeR thank you for this changes, please fix linter issues in actions |
docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_individually.py
Outdated
Show resolved
Hide resolved
|
||
def handle_confirm(confirmation): | ||
try: | ||
result = confirmation.result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think will be better check the instance type for ConfirmationFrameType
. And add separate branches for Basic.Ack
, Basic.Nack
, Basic.Reject
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Admittedly I didn't put a lot of time into it, but I couldn't figure out a good way to actually import Basic.Ack
without going through aiormq
, which was unexpected to me (maybe my expectations are wrong, though).
I expected something like aio_pika.messages.Basic.Ack
or just aio_pika.Basic.Ack
(what works is aio_pika.message.aiormq.spec.Basic.Ack
, but that is unwieldy)
There is no example or documentation that uses aiormq
, yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just remember why it's done that way, the point is that it just creates asyncio.Future
instances, and you need to explicitly return them without creating a coroutine, this will give a good balance between heavy task creation if the user wants to pass it to asyncio.wait
, or asyncio.gather
.
So asyncio.wait
is a really good way in this case to separate responses with ack from others. And we shouldn't forget about returning messages which will also be considered an error, and if the brocker passed the message body back, it will be in the exception instance.
I changed the paragraph to state, that one needs to use the delivery tag to determine which But I just noticed: I don't think there is a way in The PHP library, for example, writes DeliveryInfo back into the provided message object, but If it would one could do something like this to carry the message and delivery tag to the handler: message_var.set(Message(msg)) #This is a reference, so `publish()` can modify it, before it is picked up in handle_confirm
task = tg.create_task(
channel.default_exchange.publish(
message_var.get(),
routing_key=queue.name,
timeout=5.0,
)
).add_done_callback(handle_confirm) But without that I don't really see a good way to actually handle these confirmations asynchronously? But maybe I'm missing something here. I'm quite new to this, after all. |
def handle_confirm(confirmation): | ||
try: | ||
_ = confirmation.result() | ||
# code when message is ack-ed | ||
except DeliveryError: | ||
# code when message is nack-ed | ||
pass | ||
except TimeoutError: | ||
# code for message timeout | ||
pass | ||
else: | ||
# code when message is confirmed | ||
pass | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def handle_confirm(confirmation): | |
try: | |
_ = confirmation.result() | |
# code when message is ack-ed | |
except DeliveryError: | |
# code when message is nack-ed | |
pass | |
except TimeoutError: | |
# code for message timeout | |
pass | |
else: | |
# code when message is confirmed | |
pass |
async with asyncio.TaskGroup() as tg: | ||
# Sending the messages | ||
for msg in get_messages_to_publish(): | ||
tg.create_task( | ||
channel.default_exchange.publish( | ||
Message(msg), | ||
routing_key=queue.name, | ||
timeout=5.0, | ||
) | ||
).add_done_callback(handle_confirm) | ||
|
||
print(" [x] Sent and confirmed multiple messages asynchronously. ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async with asyncio.TaskGroup() as tg: | |
# Sending the messages | |
for msg in get_messages_to_publish(): | |
tg.create_task( | |
channel.default_exchange.publish( | |
Message(msg), | |
routing_key=queue.name, | |
timeout=5.0, | |
) | |
).add_done_callback(handle_confirm) | |
print(" [x] Sent and confirmed multiple messages asynchronously. ") | |
# List for async tasks | |
tasks = [] | |
# Sending the messages | |
for msg in get_messages_to_publish(): | |
task = asyncio.create_task( | |
channel.default_exchange.publish( | |
Message(msg), | |
routing_key=queue.name, | |
timeout=5, | |
) | |
) | |
tasks.append(task) | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
success = 0 | |
error = 0 | |
for result in results: | |
if isinstance(result, (DeliveryError, asyncio.TimeoutError)): | |
error += 1 | |
else: | |
success += 1 | |
print(" [x] Sent and confirmed multiple messages asynchronously. ") | |
print("Success:", success) | |
print("Errored:", error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this solves the problem. This is more like a variation of the batch confirmation, except it sends all messages as a single batch. It also has no way to determine which messages failed or succeeded. It only ever detects failures after all messages have been published, so it cannot react to delivery errors or confirmations asynchronously. It only works for a fixed amount of messages and not for sending an infinite amount of messages.
So I don't think this code suggestion matches the ideas that are presented as the asynchronous "Strategy 3" in the original RabbitMQ tutorial
I suppose with the new get_underlay_channel()
method one could use channel.get_underlay_channel().delivery_tag
to get the latest delivery tag:
awaiting_confirmations = {}
delivery_tag_var = ContextVar('delivery_tag')
#...
for msg in get_messages_to_publish():
task = asyncio.create_task(
channel.default_exchange.publish(
Message(msg),
routing_key=queue.name,
timeout=5,
)
)
delivery_tag = channel.get_underlay_channel().delivery_tag #Get last used delivery tag from underlying channel
delivery_tag_var.set(delivery_tag) #Required, so we can also know the delivery_tag in case of Timeout. For DeliveryError we can get it from the frame contained in the exception
awaiting_confirmations[delivery_tag] = msg #Store message body to maybe resend
task.add_done_callback(handle_confirms)
What do you think?
I think I just noticed a fatal flaw in my examples (and possibly in But thinking about that more I don't think this is the case at all. In python a coroutine(very much in contrast to JavaScripts
In my current batch example that does not happen, meaning, that the messages are not even started to be sent to the broker until a batch is completed with I haven't confirmed that suspicion, yet, though. If this is correct that makes me wonder if It does not have to be |
@MaPePeR Even if you try to make That's the way the world works, not perfectly, of course. |
This is not true. Compare these two examples between JavaScript and Python: async function asyncFunction() {
console.log("2. Starting async function synchronously");
await new Promise(resolve => setTimeout(resolve, 1)); // something like asyncio.sleep
console.log("4. End of async function");
}
console.log("1. Scheduling async function")
a = asyncFunction();
console.log("3. Do something else, then await");
await a
console.log("5. Done") outputs:
compare to the python version: async def asyncFunction():
print("3. Starting async function only after it has been awaited")
await asyncio.sleep(1)
print("4. End of async function")
print("1. Scheduling async function")
a = asyncFunction()
print("2. Do something else, then await")
await a
print("5. Done") outputs:
This means that, in Python, if you have a synchronous part in your From the Javascript documentation:
This is even true for the
Compare to python documentation:
If |
That is exacly what I write, the context switch will be does when first next function sleep (time) {
return new Promise((resolve) => setTimeout(resolve, time));
}
async function delayed() {
await sleep(1);
console.log("Hello");
}
function main() {
var result = delayed();
while (1) {};
}
console.log("Running");
main();
// No any hello has been written |
If you put a And that is the location of the synchronous code that actually creates and hands over the asynchronous IO work to the OS so it can be done asynchronously. |
@MaPePeR the similar example on python, no differencies IMHO: import asyncio
def sleep(time):
print("start sleeping")
future = asyncio.Future()
asyncio.get_running_loop().call_later(time, future.set_result, None)
return future
async def delayed():
await sleep(1)
print("Hello")
async def main():
result = delayed()
while True:
pass
print("Running")
asyncio.run(main()) |
The difference occurs if you put the I already provided example code with output above:
See the swapped order in 2. and 3. |
@MaPePeR the real reason is the JS has no a coroutines, just Promises (aka Futures in python). Coroutines is like a generators not starting before the |
In the batch example I assumed that when calling the
This is what happens if you use What happens instead is, that the message is queued locally and only upon calling For some/maybe a lot of use cases that might not be a dealbreaker. But asynchronous I/O it is not. Slapping In Javascript you get this behavior for free. In Python you have to do something like this to fix it from a library perspective: def sendMessage(msg) -> Awaitable:
# do message preprocessing
awaitable = some.internal.lib.sendMessageAsync(preprocessed_msg)
return asyncWaitForSend(awaitable) # Calling async function from non-async function is fine. We can't just await it, but we leave awaiting to the caller
async def asyncWaitForSend(awaitable):
send_result = await awaitable
# do message post processing
return processed_send_result And this is only asynchronous if |
There was a discussion about the "sync write"/"async read" asymmetry in a google group. I think Martin Teichmann has a good explanation:
|
@MaPePeR as I wrote earlier, first of all you have no guarantee that the data is sent, because write is just send it to the buffer, and if that buffer is full or tcp-window is full, the remote party doesn't respond, the data won't send anywhere. Secondly, the AMQP 0.9.x protocol is designed so that one message consists of at least three frames, and before all of them are sent, no other frames can be sent to that channel, otherwise this will lead to channel closing with an error. For this reason, you must take a lock on sending a message in a particular channel. |
Yes, there are no guarantees that the data is sent, but there is a pledge to attempt to send the data as soon as possible. The
It doesn't really matter, that there are exceptions that result in the data not being sent immediately, but queued instead. The big benefit of asyncio is, that they are sent immediately when the buffer and tcp-window aren't full. I don't see how the multiple frames are a problem. There is only every one coroutine/task active anyway and as long as that does not do any But fixing this is probably not really feasible, as its a problem that goes through all the layers. From Maybe my Mindset is just wrong. If I see an asynchronous function call I think it will do something and the confirmations= []
async for i in async_range(1000000):
confirmations.append(exchange.publish(...)) # Don't await individually, so we can send a lot faster
await asyncio.sleep(5)
#Surely all messages are published now, just gather the confirmations...
await asyncio.gather(*confirmations) #Nevermind, I will start sending messages now The fact that above code will only ever start to send messages when I think another pain point for using it asynchronously is, that the |
@MaPePeR this example is the simple way to get what you really want, I guess, but I really doesn't know why it's useful for real life. import asyncio
from aio_pika import Message, connect
def get_messages_to_publish():
for i in range(1000):
yield f"Hello World {i}!".encode()
async def main() -> None:
# Perform connection
async with await connect("amqp://guest:guest@localhost/") as connection:
# Creating a channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue("hello")
# List for async tasks
tasks = []
async def publisher(body):
print("Start sending:", body)
result = await channel.default_exchange.publish(
Message(msg),
routing_key=queue.name,
timeout=5,
)
print("Body sent:", body)
return result
# Sending the messages
for msg in get_messages_to_publish():
task = asyncio.create_task(publisher(msg))
tasks.append(task)
# enforce context switch and tasks will go on
await asyncio.sleep(0)
results = await asyncio.gather(*tasks, return_exceptions=True)
print("Last delivery tag is:", results[-1].delivery_tag)
# Last delivery tag is: 1000
if __name__ == "__main__":
asyncio.run(main()) Usually in real practice you either need to send a huge batch of messages, and then you don't need a publisher confrms at all, and you should use transactions instead. Or there are many coroutines is running in the application and sending messages independently, then this is useful and each of them will verify the status of a particular delivery. Everything you described above is fine in some general and bookish case, and "should probably" work, but the world is not perfect, and if you try to implement your idea, on a message with a body of a couple of hundred megabytes, everything will break, believe me, I've already checked it. btw: If you look at the commits at the link, you'll see that your idea is not new, and I gave it up back in 2017. |
cf58e37
to
2813b50
Compare
I tried to write the tutorial in a way, so it stays "true" to the original intent of the original rabbitmq tutorial I was copying. That's the reason. I changed the examples now and thought that would be the final version, but I just noticed, that when I execute the "asynchronous" (Strategy 3) version with 100.000 messages it will start to timeout messages after ~5200. Probably because the event loops prefers creating new tasks over actually sending or receiving messages. 😕 |
docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py
Outdated
Show resolved
Hide resolved
@MaPePeR nice work. Thank you. |
Because I was experimenting with it anyway I gave it a shot to transfer the RabbitMQ Publisher Confirms tutorial to
aio-pika
Someone who actually knows what they are doing should take a deep look at this, though (I don't).
Especially my example codes. (For some reason the batch example is faster than the asynchronous example on my limited testing?)
await asyncio.wait_for(asyncio.gather(...))
the correct construct?wait_for
cancels the tasks when a timeout occurs. Is it OK to cancel the confirmation tasks when a timeout occurs or should they beshield
ed?TaskGroup
in the asynchronous example to make sure the tasks are awaited somewhere. Is this the correct approach?