-
Notifications
You must be signed in to change notification settings - Fork 268
Description
Is your feature request related to a problem? Please describe.
Currently (with faststream-0.4.7) when multiple rpc publications are done simultaneously calls are done one after the other.
IIRC this is due to a AioPikaFastProducer._rpc_lock
.
Describe the solution you'd like
It would be great that calls to rpc are done simultaneously (for instance with a correlation id as documented on https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/6-rpc.html).
Feature code example
client.py:
import asyncio
import datetime
import uuid
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
async def send(msg: str) -> None:
print("sending", msg, datetime.datetime.now())
r = await broker.publish(
msg,
"test",
rpc=True,
rpc_timeout=None,
)
print("received", r, datetime.datetime.now())
async def main() -> None:
await broker.connect()
await asyncio.gather(*[send(f"{i}:{uuid.uuid4()}") for i in range(10)])
if __name__ == "__main__":
asyncio.run(main())
server.py
import asyncio
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/", max_consumers=10)
@broker.subscriber("test")
async def base_handler(body: str) -> str:
print("received", body)
await asyncio.sleep(1)
return body
async def main() -> None:
app = FastStream(broker)
await app.run() # blocking method
if __name__ == "__main__":
asyncio.run(main())
logs :
sending 0:92d1c227-5b4f-468a-8779-723fff743252 2024-03-18 16:12:43.587208
sending 1:3d3e1780-fe2b-40b2-9a97-f75498c6d5a2 2024-03-18 16:12:43.589087
sending 2:9f25e75e-856e-409e-98a7-89244ed71860 2024-03-18 16:12:43.589135
sending 3:f1b008b3-434f-4281-9a07-b027063f65b9 2024-03-18 16:12:43.589204
sending 4:05abb242-9880-4a84-8d67-c1cca88ee2c1 2024-03-18 16:12:43.589230
sending 5:75914f26-290a-4fa5-9606-50d6d5312344 2024-03-18 16:12:43.589250
sending 6:08f436fa-8238-4aa8-be19-9d8cab9fd637 2024-03-18 16:12:43.589269
sending 7:67db3517-c552-46e7-95e5-0f0d171790f3 2024-03-18 16:12:43.589286
sending 8:c558b771-7058-48f8-a82c-05de46ea4b18 2024-03-18 16:12:43.589304
sending 9:095ea2da-7e09-4d84-9adb-f36284d12f12 2024-03-18 16:12:43.589321
received 0:92d1c227-5b4f-468a-8779-723fff743252 2024-03-18 16:12:44.606000
received 1:3d3e1780-fe2b-40b2-9a97-f75498c6d5a2 2024-03-18 16:12:45.619650
received 2:9f25e75e-856e-409e-98a7-89244ed71860 2024-03-18 16:12:46.627189
received 3:f1b008b3-434f-4281-9a07-b027063f65b9 2024-03-18 16:12:47.634551
received 4:05abb242-9880-4a84-8d67-c1cca88ee2c1 2024-03-18 16:12:48.646457
received 5:75914f26-290a-4fa5-9606-50d6d5312344 2024-03-18 16:12:49.655983
received 6:08f436fa-8238-4aa8-be19-9d8cab9fd637 2024-03-18 16:12:50.667091
received 7:67db3517-c552-46e7-95e5-0f0d171790f3 2024-03-18 16:12:51.676912
received 8:c558b771-7058-48f8-a82c-05de46ea4b18 2024-03-18 16:12:52.685301
received 9:095ea2da-7e09-4d84-9adb-f36284d12f12 2024-03-18 16:12:53.695455
Note that the last answer arrives 10s later (instead of 1s)
Describe alternatives you've considered
A workaround is to avoid faststream on client side and use rpc with correlation id as documented on https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/6-rpc.html
Additional context
None