diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 150de9cc30..f10774cc8c 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -765,6 +765,9 @@ search: - [RabbitRouter](api/faststream/rabbit/fastapi/RabbitRouter.md) - router - [RabbitRouter](api/faststream/rabbit/fastapi/router/RabbitRouter.md) + - helpers + - declarer + - [RabbitDeclarer](api/faststream/rabbit/helpers/declarer/RabbitDeclarer.md) - message - [RabbitMessage](api/faststream/rabbit/message/RabbitMessage.md) - opentelemetry @@ -821,7 +824,6 @@ search: - [apply_pattern](api/faststream/rabbit/testing/apply_pattern.md) - [build_message](api/faststream/rabbit/testing/build_message.md) - utils - - [RabbitDeclarer](api/faststream/rabbit/utils/RabbitDeclarer.md) - [build_url](api/faststream/rabbit/utils/build_url.md) - [is_routing_exchange](api/faststream/rabbit/utils/is_routing_exchange.md) - redis diff --git a/docs/docs/en/api/faststream/rabbit/utils/RabbitDeclarer.md b/docs/docs/en/api/faststream/rabbit/helpers/declarer/RabbitDeclarer.md similarity index 67% rename from docs/docs/en/api/faststream/rabbit/utils/RabbitDeclarer.md rename to docs/docs/en/api/faststream/rabbit/helpers/declarer/RabbitDeclarer.md index 28a5a6b7c5..b8fc8a0ebd 100644 --- a/docs/docs/en/api/faststream/rabbit/utils/RabbitDeclarer.md +++ b/docs/docs/en/api/faststream/rabbit/helpers/declarer/RabbitDeclarer.md @@ -8,4 +8,4 @@ search: boost: 0.5 --- -::: faststream.rabbit.utils.RabbitDeclarer +::: faststream.rabbit.helpers.declarer.RabbitDeclarer diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index f7ec134f86..6cb357fef7 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -20,6 +20,7 @@ from faststream.exceptions import NOT_CONNECTED_YET from faststream.rabbit.broker.logging import RabbitLoggingBroker from faststream.rabbit.broker.registrator import RabbitRegistrator +from faststream.rabbit.helpers.declarer import RabbitDeclarer from faststream.rabbit.publisher.producer import AioPikaFastProducer from faststream.rabbit.schemas import ( RABBIT_REPLY, @@ -28,7 +29,7 @@ ) from faststream.rabbit.security import parse_security from faststream.rabbit.subscriber.asyncapi import AsyncAPISubscriber -from faststream.rabbit.utils import RabbitDeclarer, build_url +from faststream.rabbit.utils import build_url if TYPE_CHECKING: from ssl import SSLContext @@ -429,7 +430,6 @@ async def _connect( # type: ignore[override] await declarer.declare_queue(RABBIT_REPLY) self._producer = AioPikaFastProducer( - channel=channel, declarer=declarer, decoder=self._decoder, parser=self._parser, @@ -437,7 +437,9 @@ async def _connect( # type: ignore[override] if max_consumers: c = AsyncAPISubscriber.build_log_context( - None, RabbitQueue(""), RabbitExchange("") + None, + RabbitQueue(""), + RabbitExchange(""), ) self._log(f"Set max consumers to {max_consumers}", extra=c) await channel.set_qos(prefetch_count=int(max_consumers)) diff --git a/faststream/rabbit/helpers/__init__.py b/faststream/rabbit/helpers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/faststream/rabbit/helpers/declarer.py b/faststream/rabbit/helpers/declarer.py new file mode 100644 index 0000000000..57c21a3a78 --- /dev/null +++ b/faststream/rabbit/helpers/declarer.py @@ -0,0 +1,79 @@ +from typing import TYPE_CHECKING, Dict, cast + +if TYPE_CHECKING: + import aio_pika + + from faststream.rabbit.schemas import RabbitExchange, RabbitQueue + + +class RabbitDeclarer: + """An utility class to declare RabbitMQ queues and exchanges.""" + + __channel: "aio_pika.RobustChannel" + __queues: Dict["RabbitQueue", "aio_pika.RobustQueue"] + __exchanges: Dict["RabbitExchange", "aio_pika.RobustExchange"] + + def __init__(self, channel: "aio_pika.RobustChannel") -> None: + self.__channel = channel + self.__queues = {} + self.__exchanges = {} + + async def declare_queue( + self, + queue: "RabbitQueue", + passive: bool = False, + ) -> "aio_pika.RobustQueue": + """Declare a queue.""" + if (q := self.__queues.get(queue)) is None: + self.__queues[queue] = q = cast( + "aio_pika.RobustQueue", + await self.__channel.declare_queue( + name=queue.name, + durable=queue.durable, + exclusive=queue.exclusive, + passive=passive or queue.passive, + auto_delete=queue.auto_delete, + arguments=queue.arguments, + timeout=queue.timeout, + robust=queue.robust, + ), + ) + + return q + + async def declare_exchange( + self, + exchange: "RabbitExchange", + passive: bool = False, + ) -> "aio_pika.RobustExchange": + """Declare an exchange, parent exchanges and bind them each other.""" + if not exchange.name: + return self.__channel.default_exchange + + if (exch := self.__exchanges.get(exchange)) is None: + self.__exchanges[exchange] = exch = cast( + "aio_pika.RobustExchange", + await self.__channel.declare_exchange( + name=exchange.name, + type=exchange.type.value, + durable=exchange.durable, + auto_delete=exchange.auto_delete, + passive=passive or exchange.passive, + arguments=exchange.arguments, + timeout=exchange.timeout, + robust=exchange.robust, + internal=False, # deprecated RMQ option + ), + ) + + if exchange.bind_to is not None: + parent = await self.declare_exchange(exchange.bind_to) + await exch.bind( + exchange=parent, + routing_key=exchange.routing, + arguments=exchange.bind_arguments, + timeout=exchange.timeout, + robust=exchange.robust, + ) + + return exch diff --git a/faststream/rabbit/publisher/asyncapi.py b/faststream/rabbit/publisher/asyncapi.py index d5f785d7c1..7e0c580cdd 100644 --- a/faststream/rabbit/publisher/asyncapi.py +++ b/faststream/rabbit/publisher/asyncapi.py @@ -40,7 +40,8 @@ def get_name(self) -> str: or (self.queue.routing if is_routing_exchange(self.exchange) else None) or "_" ) - return f"{routing}:{getattr(self.exchange, 'name', '_')}:Publisher" + + return f"{routing}:{getattr(self.exchange, 'name', None) or '_'}:Publisher" def get_schema(self) -> Dict[str, Channel]: payloads = self.get_payloads() @@ -87,7 +88,7 @@ def get_schema(self) -> Dict[str, Channel]: else None, "exchange": ( amqp.Exchange(type="default", vhost=self.virtual_host) - if self.exchange is None + if not self.exchange.name else amqp.Exchange( type=self.exchange.type.value, # type: ignore name=self.exchange.name, @@ -109,7 +110,7 @@ def create( # type: ignore[override] *, routing_key: str, queue: "RabbitQueue", - exchange: Optional["RabbitExchange"], + exchange: "RabbitExchange", message_kwargs: "PublishKwargs", # Publisher args broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"], diff --git a/faststream/rabbit/publisher/producer.py b/faststream/rabbit/publisher/producer.py index db21217f50..09b4ffbb3e 100644 --- a/faststream/rabbit/publisher/producer.py +++ b/faststream/rabbit/publisher/producer.py @@ -23,7 +23,7 @@ from types import TracebackType import aiormq - from aio_pika import IncomingMessage, RobustChannel, RobustQueue + from aio_pika import IncomingMessage, RobustQueue from aio_pika.abc import DateType, HeadersType, TimeoutType from anyio.streams.memory import MemoryObjectReceiveStream @@ -31,8 +31,8 @@ AsyncCallable, CustomCallable, ) + from faststream.rabbit.helpers.declarer import RabbitDeclarer from faststream.rabbit.types import AioPikaSendableMessage - from faststream.rabbit.utils import RabbitDeclarer from faststream.types import SendableMessage @@ -45,12 +45,10 @@ class AioPikaFastProducer(ProducerProto): def __init__( self, *, - channel: "RobustChannel", declarer: "RabbitDeclarer", parser: Optional["CustomCallable"], decoder: Optional["CustomCallable"], ) -> None: - self._channel = channel self.declarer = declarer self._rpc_lock = anyio.Lock() @@ -161,14 +159,6 @@ async def _publish( app_id: Optional[str], ) -> Union["aiormq.abc.ConfirmationFrameType", "SendableMessage"]: """Publish a message to a RabbitMQ exchange.""" - p_exchange = RabbitExchange.validate(exchange) - - if p_exchange is None: - exchange_obj = self._channel.default_exchange - else: - p_exchange.passive = True - exchange_obj = await self.declarer.declare_exchange(p_exchange) - message = AioPikaParser.encode_message( message=message, persist=persist, @@ -186,6 +176,11 @@ async def _publish( app_id=app_id, ) + exchange_obj = await self.declarer.declare_exchange( + exchange=RabbitExchange.validate(exchange), + passive=True, + ) + return await exchange_obj.publish( message=message, routing_key=routing_key, diff --git a/faststream/rabbit/publisher/usecase.py b/faststream/rabbit/publisher/usecase.py index 7ac5dc6389..0472bbc127 100644 --- a/faststream/rabbit/publisher/usecase.py +++ b/faststream/rabbit/publisher/usecase.py @@ -106,7 +106,7 @@ def __init__( *, routing_key: str, queue: "RabbitQueue", - exchange: Optional["RabbitExchange"], + exchange: "RabbitExchange", message_kwargs: "PublishKwargs", # Publisher args broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"], @@ -225,7 +225,7 @@ async def publish( "routing_key": routing_key or self.routing_key or RabbitQueue.validate(queue or self.queue).routing, - "exchange": exchange or self.exchange, + "exchange": exchange or self.exchange.name, "app_id": self.app_id, "correlation_id": correlation_id or gen_cor_id(), "message_id": message_id, diff --git a/faststream/rabbit/schemas/exchange.py b/faststream/rabbit/schemas/exchange.py index be8af31c3a..a9dfae79a1 100644 --- a/faststream/rabbit/schemas/exchange.py +++ b/faststream/rabbit/schemas/exchange.py @@ -1,7 +1,7 @@ import warnings -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Any, Optional, Union -from typing_extensions import Annotated, Doc +from typing_extensions import Annotated, Doc, override from faststream.broker.schemas import NameRequired from faststream.rabbit.schemas.constants import ExchangeType @@ -39,12 +39,17 @@ def __hash__(self) -> int: ) ) + @property + def routing(self) -> str: + """Return real routing_key of object.""" + return self.routing_key or self.name + def __init__( self, name: Annotated[ str, Doc("RabbitMQ exchange name."), - ], + ] = "", type: Annotated[ ExchangeType, Doc( @@ -125,3 +130,15 @@ def __init__( self.bind_to = bind_to self.bind_arguments = bind_arguments self.routing_key = routing_key + + @override + @classmethod + def validate( # type: ignore[override] + cls, + value: Union[str, "RabbitExchange", None], + **kwargs: Any, + ) -> "RabbitExchange": + exch = super().validate(value, **kwargs) + if exch is None: + exch = RabbitExchange() + return exch diff --git a/faststream/rabbit/subscriber/asyncapi.py b/faststream/rabbit/subscriber/asyncapi.py index 158d343dd1..2b0cb4cd5b 100644 --- a/faststream/rabbit/subscriber/asyncapi.py +++ b/faststream/rabbit/subscriber/asyncapi.py @@ -19,7 +19,7 @@ class AsyncAPISubscriber(LogicSubscriber): def get_name(self) -> str: return ( - f"{self.queue.name}:{getattr(self.exchange, 'name', '_')}:{self.call_name}" + f"{self.queue.name}:{getattr(self.exchange, 'name', None) or '_'}:{self.call_name}" ) def get_schema(self) -> Dict[str, Channel]: @@ -59,7 +59,7 @@ def get_schema(self) -> Dict[str, Channel]: else None, "exchange": ( amqp.Exchange(type="default", vhost=self.virtual_host) - if self.exchange is None + if not self.exchange.name else amqp.Exchange( type=self.exchange.type.value, # type: ignore name=self.exchange.name, diff --git a/faststream/rabbit/subscriber/factory.py b/faststream/rabbit/subscriber/factory.py index 0683d2d62f..1a185cafe6 100644 --- a/faststream/rabbit/subscriber/factory.py +++ b/faststream/rabbit/subscriber/factory.py @@ -1,6 +1,5 @@ from typing import TYPE_CHECKING, Iterable, Optional, Union -from faststream.rabbit.schemas import RabbitExchange, RabbitQueue, ReplyConfig from faststream.rabbit.subscriber.asyncapi import AsyncAPISubscriber if TYPE_CHECKING: @@ -8,13 +7,14 @@ from fast_depends.dependencies import Depends from faststream.broker.types import BrokerMiddleware + from faststream.rabbit.schemas import RabbitExchange, RabbitQueue, ReplyConfig from faststream.types import AnyDict def create_subscriber( *, - queue: RabbitQueue, - exchange: Optional["RabbitExchange"], + queue: "RabbitQueue", + exchange: "RabbitExchange", consume_args: Optional["AnyDict"], reply_config: Optional["ReplyConfig"], # Subscriber args diff --git a/faststream/rabbit/subscriber/usecase.py b/faststream/rabbit/subscriber/usecase.py index e518d3ca37..67421df2da 100644 --- a/faststream/rabbit/subscriber/usecase.py +++ b/faststream/rabbit/subscriber/usecase.py @@ -23,13 +23,13 @@ from faststream.broker.message import StreamMessage from faststream.broker.types import BrokerMiddleware, CustomCallable + from faststream.rabbit.helpers.declarer import RabbitDeclarer from faststream.rabbit.publisher.producer import AioPikaFastProducer from faststream.rabbit.schemas import ( RabbitExchange, RabbitQueue, ReplyConfig, ) - from faststream.rabbit.utils import RabbitDeclarer from faststream.types import AnyDict, Decorator, LoggerProto @@ -50,7 +50,7 @@ def __init__( self, *, queue: "RabbitQueue", - exchange: Optional["RabbitExchange"], + exchange: "RabbitExchange", consume_args: Optional["AnyDict"], reply_config: Optional["ReplyConfig"], # Subscriber args @@ -141,16 +141,20 @@ async def start(self) -> None: self._queue_obj = queue = await self.declarer.declare_queue(self.queue) - if self.exchange is not None: + if ( + self.exchange is not None + and not queue.passive # queue just getted from RMQ + and self.exchange.name # check Exchange is not default + ): exchange = await self.declarer.declare_exchange(self.exchange) - if not queue.passive: - await queue.bind( - exchange, - routing_key=self.queue.routing, - arguments=self.queue.bind_arguments, - timeout=self.queue.timeout, - robust=self.queue.robust, - ) + + await queue.bind( + exchange, + routing_key=self.queue.routing, + arguments=self.queue.bind_arguments, + timeout=self.queue.timeout, + robust=self.queue.robust, + ) self._consumer_tag = await queue.consume( # NOTE: aio-pika expects AbstractIncomingMessage, not IncomingMessage diff --git a/faststream/rabbit/testing.py b/faststream/rabbit/testing.py index 36f79ef60a..3d3a274418 100644 --- a/faststream/rabbit/testing.py +++ b/faststream/rabbit/testing.py @@ -71,7 +71,7 @@ def remove_publisher_fake_subscriber( broker._subscribers.pop( AsyncAPISubscriber.get_routing_hash( queue=RabbitQueue.validate(publisher.routing), - exchange=publisher.exchange, + exchange=RabbitExchange.validate(publisher.exchange), ), None, ) diff --git a/faststream/rabbit/utils.py b/faststream/rabbit/utils.py index 4da30f7a50..1af8a30f7b 100644 --- a/faststream/rabbit/utils.py +++ b/faststream/rabbit/utils.py @@ -1,83 +1,15 @@ -from typing import TYPE_CHECKING, Any, Dict, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Optional, Union from aio_pika.connection import make_url from faststream.rabbit.schemas.constants import ExchangeType if TYPE_CHECKING: - import aio_pika from aio_pika.abc import SSLOptions from pamqp.common import FieldTable from yarl import URL - from faststream.rabbit.schemas import RabbitExchange, RabbitQueue - - -class RabbitDeclarer: - """An utility class to declare RabbitMQ queues and exchanges.""" - - channel: "aio_pika.RobustChannel" - queues: Dict["RabbitQueue", "aio_pika.RobustQueue"] - exchanges: Dict["RabbitExchange", "aio_pika.RobustExchange"] - - def __init__(self, channel: "aio_pika.RobustChannel") -> None: - self.channel = channel - self.queues = {} - self.exchanges = {} - - async def declare_queue( - self, - queue: "RabbitQueue", - ) -> "aio_pika.RobustQueue": - """Declare a queue.""" - if (q := self.queues.get(queue)) is None: - self.queues[queue] = q = cast( - "aio_pika.RobustQueue", - await self.channel.declare_queue( - name=queue.name, - durable=queue.durable, - exclusive=queue.exclusive, - passive=queue.passive, - auto_delete=queue.auto_delete, - arguments=queue.arguments, - timeout=queue.timeout, - robust=queue.robust, - ), - ) - return q - - async def declare_exchange( - self, - exchange: "RabbitExchange", - ) -> "aio_pika.RobustExchange": - """Declare an exchange, parent exchanges and bind them each other.""" - if (exch := self.exchanges.get(exchange)) is None: - self.exchanges[exchange] = exch = cast( - "aio_pika.RobustExchange", - await self.channel.declare_exchange( - name=exchange.name, - type=exchange.type.value, - durable=exchange.durable, - auto_delete=exchange.auto_delete, - passive=exchange.passive, - arguments=exchange.arguments, - timeout=exchange.timeout, - robust=exchange.robust, - internal=False, # deprecated RMQ option - ), - ) - - if exchange.bind_to is not None: - parent = await self.declare_exchange(exchange.bind_to) - await exch.bind( - exchange=parent, - routing_key=exchange.routing_key, - arguments=exchange.bind_arguments, - timeout=exchange.timeout, - robust=exchange.robust, - ) - - return exch + from faststream.rabbit.schemas import RabbitExchange def build_url( diff --git a/tests/brokers/rabbit/specific/test_declare.py b/tests/brokers/rabbit/specific/test_declare.py index 2977572d16..aed6824f3e 100644 --- a/tests/brokers/rabbit/specific/test_declare.py +++ b/tests/brokers/rabbit/specific/test_declare.py @@ -1,7 +1,7 @@ import pytest from faststream.rabbit import RabbitBroker, RabbitExchange, RabbitQueue -from faststream.rabbit.utils import RabbitDeclarer +from faststream.rabbit.helpers.declarer import RabbitDeclarer @pytest.mark.asyncio() diff --git a/tests/docs/rabbit/test_declare.py b/tests/docs/rabbit/test_declare.py index 2157e2a257..e6f3891b12 100644 --- a/tests/docs/rabbit/test_declare.py +++ b/tests/docs/rabbit/test_declare.py @@ -9,5 +9,5 @@ async def test_declare(): from docs.docs_src.rabbit.declare import app, broker async with TestApp(app): - assert len(broker.declarer.exchanges) == 1 - assert len(broker.declarer.queues) == 2 # with `reply-to` + assert len(broker.declarer._RabbitDeclarer__exchanges) == 1 + assert len(broker.declarer._RabbitDeclarer__queues) == 2 # with `reply-to`