Skip to content

Commit

Permalink
feat: provide with an ability to create default RMQ Exchange (#1485)
Browse files Browse the repository at this point in the history
* feat: provide with an ability to create default RMQ Exchange

* refactor: optiomize RMQ declarer

* fix: correct import in Tests

* fix: correct AsyncAPI RMQ Schema

* fix: correct exchange propagation

* docs: remove useless API file
  • Loading branch information
Lancetnik authored Jun 1, 2024
1 parent 0d3291d commit 45d9cf6
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 115 deletions.
4 changes: 3 additions & 1 deletion docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,9 @@ search:
- [RabbitRouter](api/faststream/rabbit/fastapi/RabbitRouter.md)
- router
- [RabbitRouter](api/faststream/rabbit/fastapi/router/RabbitRouter.md)
- helpers
- declarer
- [RabbitDeclarer](api/faststream/rabbit/helpers/declarer/RabbitDeclarer.md)
- message
- [RabbitMessage](api/faststream/rabbit/message/RabbitMessage.md)
- opentelemetry
Expand Down Expand Up @@ -821,7 +824,6 @@ search:
- [apply_pattern](api/faststream/rabbit/testing/apply_pattern.md)
- [build_message](api/faststream/rabbit/testing/build_message.md)
- utils
- [RabbitDeclarer](api/faststream/rabbit/utils/RabbitDeclarer.md)
- [build_url](api/faststream/rabbit/utils/build_url.md)
- [is_routing_exchange](api/faststream/rabbit/utils/is_routing_exchange.md)
- redis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.rabbit.utils.RabbitDeclarer
::: faststream.rabbit.helpers.declarer.RabbitDeclarer
8 changes: 5 additions & 3 deletions faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from faststream.exceptions import NOT_CONNECTED_YET
from faststream.rabbit.broker.logging import RabbitLoggingBroker
from faststream.rabbit.broker.registrator import RabbitRegistrator
from faststream.rabbit.helpers.declarer import RabbitDeclarer
from faststream.rabbit.publisher.producer import AioPikaFastProducer
from faststream.rabbit.schemas import (
RABBIT_REPLY,
Expand All @@ -28,7 +29,7 @@
)
from faststream.rabbit.security import parse_security
from faststream.rabbit.subscriber.asyncapi import AsyncAPISubscriber
from faststream.rabbit.utils import RabbitDeclarer, build_url
from faststream.rabbit.utils import build_url

if TYPE_CHECKING:
from ssl import SSLContext
Expand Down Expand Up @@ -429,15 +430,16 @@ async def _connect( # type: ignore[override]
await declarer.declare_queue(RABBIT_REPLY)

self._producer = AioPikaFastProducer(
channel=channel,
declarer=declarer,
decoder=self._decoder,
parser=self._parser,
)

if max_consumers:
c = AsyncAPISubscriber.build_log_context(
None, RabbitQueue(""), RabbitExchange("")
None,
RabbitQueue(""),
RabbitExchange(""),
)
self._log(f"Set max consumers to {max_consumers}", extra=c)
await channel.set_qos(prefetch_count=int(max_consumers))
Expand Down
Empty file.
79 changes: 79 additions & 0 deletions faststream/rabbit/helpers/declarer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import TYPE_CHECKING, Dict, cast

if TYPE_CHECKING:
import aio_pika

from faststream.rabbit.schemas import RabbitExchange, RabbitQueue


class RabbitDeclarer:
"""An utility class to declare RabbitMQ queues and exchanges."""

__channel: "aio_pika.RobustChannel"
__queues: Dict["RabbitQueue", "aio_pika.RobustQueue"]
__exchanges: Dict["RabbitExchange", "aio_pika.RobustExchange"]

def __init__(self, channel: "aio_pika.RobustChannel") -> None:
self.__channel = channel
self.__queues = {}
self.__exchanges = {}

async def declare_queue(
self,
queue: "RabbitQueue",
passive: bool = False,
) -> "aio_pika.RobustQueue":
"""Declare a queue."""
if (q := self.__queues.get(queue)) is None:
self.__queues[queue] = q = cast(
"aio_pika.RobustQueue",
await self.__channel.declare_queue(
name=queue.name,
durable=queue.durable,
exclusive=queue.exclusive,
passive=passive or queue.passive,
auto_delete=queue.auto_delete,
arguments=queue.arguments,
timeout=queue.timeout,
robust=queue.robust,
),
)

return q

async def declare_exchange(
self,
exchange: "RabbitExchange",
passive: bool = False,
) -> "aio_pika.RobustExchange":
"""Declare an exchange, parent exchanges and bind them each other."""
if not exchange.name:
return self.__channel.default_exchange

if (exch := self.__exchanges.get(exchange)) is None:
self.__exchanges[exchange] = exch = cast(
"aio_pika.RobustExchange",
await self.__channel.declare_exchange(
name=exchange.name,
type=exchange.type.value,
durable=exchange.durable,
auto_delete=exchange.auto_delete,
passive=passive or exchange.passive,
arguments=exchange.arguments,
timeout=exchange.timeout,
robust=exchange.robust,
internal=False, # deprecated RMQ option
),
)

if exchange.bind_to is not None:
parent = await self.declare_exchange(exchange.bind_to)
await exch.bind(
exchange=parent,
routing_key=exchange.routing,
arguments=exchange.bind_arguments,
timeout=exchange.timeout,
robust=exchange.robust,
)

return exch
7 changes: 4 additions & 3 deletions faststream/rabbit/publisher/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def get_name(self) -> str:
or (self.queue.routing if is_routing_exchange(self.exchange) else None)
or "_"
)
return f"{routing}:{getattr(self.exchange, 'name', '_')}:Publisher"

return f"{routing}:{getattr(self.exchange, 'name', None) or '_'}:Publisher"

def get_schema(self) -> Dict[str, Channel]:
payloads = self.get_payloads()
Expand Down Expand Up @@ -87,7 +88,7 @@ def get_schema(self) -> Dict[str, Channel]:
else None,
"exchange": (
amqp.Exchange(type="default", vhost=self.virtual_host)
if self.exchange is None
if not self.exchange.name
else amqp.Exchange(
type=self.exchange.type.value, # type: ignore
name=self.exchange.name,
Expand All @@ -109,7 +110,7 @@ def create( # type: ignore[override]
*,
routing_key: str,
queue: "RabbitQueue",
exchange: Optional["RabbitExchange"],
exchange: "RabbitExchange",
message_kwargs: "PublishKwargs",
# Publisher args
broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"],
Expand Down
19 changes: 7 additions & 12 deletions faststream/rabbit/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
from types import TracebackType

import aiormq
from aio_pika import IncomingMessage, RobustChannel, RobustQueue
from aio_pika import IncomingMessage, RobustQueue
from aio_pika.abc import DateType, HeadersType, TimeoutType
from anyio.streams.memory import MemoryObjectReceiveStream

from faststream.broker.types import (
AsyncCallable,
CustomCallable,
)
from faststream.rabbit.helpers.declarer import RabbitDeclarer
from faststream.rabbit.types import AioPikaSendableMessage
from faststream.rabbit.utils import RabbitDeclarer
from faststream.types import SendableMessage


Expand All @@ -45,12 +45,10 @@ class AioPikaFastProducer(ProducerProto):
def __init__(
self,
*,
channel: "RobustChannel",
declarer: "RabbitDeclarer",
parser: Optional["CustomCallable"],
decoder: Optional["CustomCallable"],
) -> None:
self._channel = channel
self.declarer = declarer

self._rpc_lock = anyio.Lock()
Expand Down Expand Up @@ -161,14 +159,6 @@ async def _publish(
app_id: Optional[str],
) -> Union["aiormq.abc.ConfirmationFrameType", "SendableMessage"]:
"""Publish a message to a RabbitMQ exchange."""
p_exchange = RabbitExchange.validate(exchange)

if p_exchange is None:
exchange_obj = self._channel.default_exchange
else:
p_exchange.passive = True
exchange_obj = await self.declarer.declare_exchange(p_exchange)

message = AioPikaParser.encode_message(
message=message,
persist=persist,
Expand All @@ -186,6 +176,11 @@ async def _publish(
app_id=app_id,
)

exchange_obj = await self.declarer.declare_exchange(
exchange=RabbitExchange.validate(exchange),
passive=True,
)

return await exchange_obj.publish(
message=message,
routing_key=routing_key,
Expand Down
4 changes: 2 additions & 2 deletions faststream/rabbit/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(
*,
routing_key: str,
queue: "RabbitQueue",
exchange: Optional["RabbitExchange"],
exchange: "RabbitExchange",
message_kwargs: "PublishKwargs",
# Publisher args
broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"],
Expand Down Expand Up @@ -225,7 +225,7 @@ async def publish(
"routing_key": routing_key
or self.routing_key
or RabbitQueue.validate(queue or self.queue).routing,
"exchange": exchange or self.exchange,
"exchange": exchange or self.exchange.name,
"app_id": self.app_id,
"correlation_id": correlation_id or gen_cor_id(),
"message_id": message_id,
Expand Down
23 changes: 20 additions & 3 deletions faststream/rabbit/schemas/exchange.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import warnings
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Any, Optional, Union

from typing_extensions import Annotated, Doc
from typing_extensions import Annotated, Doc, override

from faststream.broker.schemas import NameRequired
from faststream.rabbit.schemas.constants import ExchangeType
Expand Down Expand Up @@ -39,12 +39,17 @@ def __hash__(self) -> int:
)
)

@property
def routing(self) -> str:
"""Return real routing_key of object."""
return self.routing_key or self.name

def __init__(
self,
name: Annotated[
str,
Doc("RabbitMQ exchange name."),
],
] = "",
type: Annotated[
ExchangeType,
Doc(
Expand Down Expand Up @@ -125,3 +130,15 @@ def __init__(
self.bind_to = bind_to
self.bind_arguments = bind_arguments
self.routing_key = routing_key

@override
@classmethod
def validate( # type: ignore[override]
cls,
value: Union[str, "RabbitExchange", None],
**kwargs: Any,
) -> "RabbitExchange":
exch = super().validate(value, **kwargs)
if exch is None:
exch = RabbitExchange()
return exch
4 changes: 2 additions & 2 deletions faststream/rabbit/subscriber/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class AsyncAPISubscriber(LogicSubscriber):

def get_name(self) -> str:
return (
f"{self.queue.name}:{getattr(self.exchange, 'name', '_')}:{self.call_name}"
f"{self.queue.name}:{getattr(self.exchange, 'name', None) or '_'}:{self.call_name}"
)

def get_schema(self) -> Dict[str, Channel]:
Expand Down Expand Up @@ -59,7 +59,7 @@ def get_schema(self) -> Dict[str, Channel]:
else None,
"exchange": (
amqp.Exchange(type="default", vhost=self.virtual_host)
if self.exchange is None
if not self.exchange.name
else amqp.Exchange(
type=self.exchange.type.value, # type: ignore
name=self.exchange.name,
Expand Down
6 changes: 3 additions & 3 deletions faststream/rabbit/subscriber/factory.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
from typing import TYPE_CHECKING, Iterable, Optional, Union

from faststream.rabbit.schemas import RabbitExchange, RabbitQueue, ReplyConfig
from faststream.rabbit.subscriber.asyncapi import AsyncAPISubscriber

if TYPE_CHECKING:
from aio_pika import IncomingMessage
from fast_depends.dependencies import Depends

from faststream.broker.types import BrokerMiddleware
from faststream.rabbit.schemas import RabbitExchange, RabbitQueue, ReplyConfig
from faststream.types import AnyDict


def create_subscriber(
*,
queue: RabbitQueue,
exchange: Optional["RabbitExchange"],
queue: "RabbitQueue",
exchange: "RabbitExchange",
consume_args: Optional["AnyDict"],
reply_config: Optional["ReplyConfig"],
# Subscriber args
Expand Down
26 changes: 15 additions & 11 deletions faststream/rabbit/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@

from faststream.broker.message import StreamMessage
from faststream.broker.types import BrokerMiddleware, CustomCallable
from faststream.rabbit.helpers.declarer import RabbitDeclarer
from faststream.rabbit.publisher.producer import AioPikaFastProducer
from faststream.rabbit.schemas import (
RabbitExchange,
RabbitQueue,
ReplyConfig,
)
from faststream.rabbit.utils import RabbitDeclarer
from faststream.types import AnyDict, Decorator, LoggerProto


Expand All @@ -50,7 +50,7 @@ def __init__(
self,
*,
queue: "RabbitQueue",
exchange: Optional["RabbitExchange"],
exchange: "RabbitExchange",
consume_args: Optional["AnyDict"],
reply_config: Optional["ReplyConfig"],
# Subscriber args
Expand Down Expand Up @@ -141,16 +141,20 @@ async def start(self) -> None:

self._queue_obj = queue = await self.declarer.declare_queue(self.queue)

if self.exchange is not None:
if (
self.exchange is not None
and not queue.passive # queue just getted from RMQ
and self.exchange.name # check Exchange is not default
):
exchange = await self.declarer.declare_exchange(self.exchange)
if not queue.passive:
await queue.bind(
exchange,
routing_key=self.queue.routing,
arguments=self.queue.bind_arguments,
timeout=self.queue.timeout,
robust=self.queue.robust,
)

await queue.bind(
exchange,
routing_key=self.queue.routing,
arguments=self.queue.bind_arguments,
timeout=self.queue.timeout,
robust=self.queue.robust,
)

self._consumer_tag = await queue.consume(
# NOTE: aio-pika expects AbstractIncomingMessage, not IncomingMessage
Expand Down
2 changes: 1 addition & 1 deletion faststream/rabbit/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def remove_publisher_fake_subscriber(
broker._subscribers.pop(
AsyncAPISubscriber.get_routing_hash(
queue=RabbitQueue.validate(publisher.routing),
exchange=publisher.exchange,
exchange=RabbitExchange.validate(publisher.exchange),
),
None,
)
Expand Down
Loading

0 comments on commit 45d9cf6

Please sign in to comment.