Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixing queue arguments management
  • Loading branch information
DanielePalaia committed Jan 9, 2025
commit 078b70d938f7aca0aa41f0fdffe435cb9ec8aedf
2 changes: 1 addition & 1 deletion examples/getting_started/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def main() -> None:
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))

management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)

print("binding queue to exchange")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "rabbitmq-amqp-python-client"
version = "0.0.1"
version = "0.1.0"
description = "Python RabbitMQ client for AMQP 1.0 protocol"
authors = ["RabbitMQ team"]
license = "Apache-2.0 license"
Expand Down
14 changes: 9 additions & 5 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Optional
from typing import Any, Optional

from .common import ExchangeType, QueueType

Expand All @@ -17,20 +17,24 @@ class ExchangeSpecification:
@dataclass
class QueueSpecification:
name: str
arguments: dict[str, str]
queue_type: QueueType = QueueType.quorum
dead_letter_routing_key: str = ""
dead_letter_routing_key: Optional[str] = None
is_exclusive: Optional[bool] = None
max_len: Optional[int] = None
max_len_bytes: Optional[int] = None
dead_letter_exchange: str = ""
message_ttl: Optional[int] = None
expires: Optional[int] = None
dead_letter_exchange: Optional[str] = ""
is_auto_delete: bool = False
is_durable: bool = True
overflow: Optional[str] = None
single_active_consumer: Optional[bool] = None


@dataclass
class QueueInfo:
name: str
arguments: dict[str, str]
arguments: dict[str, Any]
queue_type: QueueType = QueueType.quorum
is_exclusive: Optional[bool] = None
is_auto_delete: bool = False
Expand Down
2 changes: 1 addition & 1 deletion rabbitmq_amqp_python_client/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class ValidationCodeException(Exception):
class ValidationCodeException(BaseException):
# Constructor or Initializer
def __init__(self, msg: str):
self.msg = msg
Expand Down
33 changes: 26 additions & 7 deletions rabbitmq_amqp_python_client/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def declare_exchange(
body["durable"] = exchange_specification.is_durable
body["type"] = exchange_specification.exchange_type.value # type: ignore
body["internal"] = exchange_specification.is_internal
body["arguments"] = {} # type: ignore
body["arguments"] = exchange_specification.arguments # type: ignore

path = exchange_address(exchange_specification.name)

Expand All @@ -129,14 +129,33 @@ def declare_queue(
) -> QueueSpecification:
logger.debug("declare_queue operation called")
body = {}
args: dict[str, Any] = {}

body["auto_delete"] = queue_specification.is_auto_delete
body["durable"] = queue_specification.is_durable
body["arguments"] = { # type: ignore
"x-queue-type": queue_specification.queue_type.value,
"x-dead-letter-exchange": queue_specification.dead_letter_exchange,
"x-dead-letter-routing-key": queue_specification.dead_letter_routing_key,
"max-length-bytes": queue_specification.max_len_bytes,
}
args["x-queue-type"] = queue_specification.queue_type.value
if queue_specification.dead_letter_exchange is not None:
args["x-dead-letter-exchange"] = queue_specification.dead_letter_exchange
if queue_specification.dead_letter_routing_key is not None:
args["x-dead-letter-routing-key"] = (
queue_specification.dead_letter_routing_key
)
if queue_specification.overflow is not None:
args["x-overflow"] = queue_specification.overflow
if queue_specification.max_len is not None:
args["x-max-length"] = queue_specification.max_len
if queue_specification.max_len_bytes is not None:
args["x-max-length-bytes"] = queue_specification.max_len_bytes
if queue_specification.message_ttl is not None:
args["x-message-ttl"] = queue_specification.message_ttl
if queue_specification.expires is not None:
args["x-expires"] = queue_specification.expires
if queue_specification.single_active_consumer is not None:
args["x-single-active-consumer"] = (
queue_specification.single_active_consumer
)

body["arguments"] = args # type: ignore

path = queue_address(queue_specification.name)

Expand Down
106 changes: 101 additions & 5 deletions tests/test_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
QueueSpecification,
QueueType,
)
from rabbitmq_amqp_python_client.exceptions import (
ValidationCodeException,
)


def test_declare_delete_exchange() -> None:
Expand Down Expand Up @@ -33,7 +36,7 @@ def test_declare_purge_delete_queue() -> None:
management = connection.management()

queue_info = management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)

assert queue_info.name == queue_name
Expand All @@ -57,7 +60,7 @@ def test_bind_exchange_to_queue() -> None:
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))

management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)

binding_exchange_queue_path = management.bind(
Expand Down Expand Up @@ -88,20 +91,113 @@ def test_bind_exchange_to_queue() -> None:
management.unbind(binding_exchange_queue_path)


def test_queue_info() -> None:
def test_queue_info_with_validations() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

queue_name = "test-bind-exchange-to-queue-queue"
queue_name = "test_queue_info_with_validation"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name, queue_type=QueueType.quorum, arguments={}
name=queue_name,
queue_type=QueueType.quorum,
)
management.declare_queue(queue_specification)

queue_info = management.queue_info(queue_name=queue_name)

management.delete_queue(queue_name)

assert queue_info.name == queue_name
assert queue_info.queue_type == queue_specification.queue_type
assert queue_info.is_durable == queue_specification.is_durable
assert queue_info.message_count == 0


def test_queue_precondition_fail() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()
test_failure = True

queue_name = "test-queue_precondition_fail"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name, queue_type=QueueType.quorum, is_auto_delete=False
)
management.declare_queue(queue_specification)

management.declare_queue(queue_specification)

queue_specification = QueueSpecification(
name=queue_name,
queue_type=QueueType.quorum,
is_auto_delete=True,
)

management.delete_queue(queue_name)

try:
management.declare_queue(queue_specification)
except ValidationCodeException:
test_failure = False

assert test_failure is False


def test_declare_classic_queue() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

queue_name = "test-declare_classic_queue"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name,
queue_type=QueueType.classic,
is_auto_delete=False,
)
queue_info = management.declare_queue(queue_specification)

assert queue_info.name == queue_specification.name
assert queue_info.queue_type == queue_specification.queue_type

management.delete_queue(queue_name)


def test_declare_queue_with_args() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

queue_name = "test-queue_with_args"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name,
queue_type=QueueType.classic,
is_auto_delete=False,
dead_letter_exchange="my_exchange",
dead_letter_routing_key="my_key",
max_len=50000000,
max_len_bytes=1000000000,
expires=2000,
single_active_consumer=True,
)

queue_info = management.declare_queue(queue_specification)

assert queue_specification.name == queue_info.name
assert queue_specification.is_auto_delete == queue_info.is_auto_delete
assert queue_specification.dead_letter_exchange == queue_info.dead_letter_exchange
assert (
queue_specification.dead_letter_routing_key
== queue_info.dead_letter_routing_key
)
assert queue_specification.max_len == queue_info.max_len
assert queue_specification.max_len_bytes == queue_info.max_len_bytes
assert queue_specification.expires == queue_info.expires
assert (
queue_specification.single_active_consumer == queue_info.single_active_consumer
)

management.delete_queue(queue_name)
2 changes: 1 addition & 1 deletion tests/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_bind_exchange_to_queue() -> None:
management = connection.management()

management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)

raised = False
Expand Down