Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improved arguments management during declare_queue
  • Loading branch information
DanielePalaia committed Jan 10, 2025
commit 43efb267aeca815d2b6b8df4ed75a550d0de1899
1 change: 0 additions & 1 deletion examples/getting_started/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
ExchangeSpecification,
Message,
QueueType,
QuorumQueueSpecification,
StreamSpecification,
exchange_address,
)
Expand Down
26 changes: 14 additions & 12 deletions rabbitmq_amqp_python_client/management.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import uuid
from typing import Any, Optional
from typing import Any, Optional, Union

from .address_helper import (
binding_path_with_exchange_queue,
Expand Down Expand Up @@ -130,19 +130,20 @@ def declare_exchange(

def declare_queue(
self,
queue_specification: (
ClassicQueueSpecification | QuorumQueueSpecification | StreamSpecification
),
) -> ClassicQueueSpecification | QuorumQueueSpecification | StreamSpecification:
queue_specification: Union[
ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification
],
) -> Union[
ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification
]:
logger.debug("declare_queue operation called")

if (
type(queue_specification) is ClassicQueueSpecification
or type(queue_specification) is QuorumQueueSpecification
if isinstance(queue_specification, ClassicQueueSpecification) or isinstance(
queue_specification, QuorumQueueSpecification
):
body = self._declare_queue(queue_specification)

elif type(queue_specification) is StreamSpecification:
elif isinstance(queue_specification, StreamSpecification):
body = self._declare_stream(queue_specification)

path = queue_address(queue_specification.name)
Expand All @@ -161,7 +162,8 @@ def declare_queue(
return queue_specification

def _declare_queue(
self, queue_specification: ClassicQueueSpecification | QuorumQueueSpecification
self,
queue_specification: Union[ClassicQueueSpecification, QuorumQueueSpecification],
) -> dict[str, Any]:

body = {}
Expand Down Expand Up @@ -191,11 +193,11 @@ def _declare_queue(
queue_specification.single_active_consumer
)

if type(queue_specification) is ClassicQueueSpecification:
if isinstance(queue_specification, ClassicQueueSpecification):
if queue_specification.maximum_priority is not None:
args["x-maximum-priority"] = queue_specification.maximum_priority

if type(queue_specification) is QuorumQueueSpecification:
if isinstance(queue_specification, QuorumQueueSpecification):
if queue_specification.deliver_limit is not None:
args["x-deliver-limit"] = queue_specification.deliver_limit

Expand Down
6 changes: 3 additions & 3 deletions rabbitmq_amqp_python_client/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class QuorumQueueSpecification(QueueSpecification):
class StreamSpecification:
name: str
queue_type: QueueType = QueueType.stream
max_len_bytes: Optional[str] = None
max_time_retention: Optional[str] = None
max_segment_size_in_bytes: Optional[str] = None
max_len_bytes: Optional[int] = None
max_time_retention: Optional[int] = None
max_segment_size_in_bytes: Optional[int] = None
filter_size: Optional[int] = None
initial_group_size: Optional[int] = None
leader_locator: Optional[str] = None
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ exclude = .git, venv
max-line-length = 120

[mypy]
python_version = 3.10
python_version = 3.9
strict = True
ignore_missing_imports = True

Expand Down
61 changes: 45 additions & 16 deletions tests/test_management.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from rabbitmq_amqp_python_client import (
BindingSpecification,
ClassicQueueSpecification,
Connection,
ExchangeSpecification,
QueueSpecification,
QueueType,
QuorumQueueSpecification,
StreamSpecification,
)
from rabbitmq_amqp_python_client.exceptions import (
ValidationCodeException,
Expand Down Expand Up @@ -35,9 +37,7 @@ def test_declare_purge_delete_queue() -> None:
queue_name = "my_queue"
management = connection.management()

queue_info = management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)
queue_info = management.declare_queue(QuorumQueueSpecification(name=queue_name))

assert queue_info.name == queue_name

Expand All @@ -59,9 +59,7 @@ def test_bind_exchange_to_queue() -> None:

management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))

management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)
management.declare_queue(QuorumQueueSpecification(name=queue_name))

binding_exchange_queue_path = management.bind(
BindingSpecification(
Expand Down Expand Up @@ -98,9 +96,8 @@ def test_queue_info_with_validations() -> None:
queue_name = "test_queue_info_with_validation"
management = connection.management()

queue_specification = QueueSpecification(
queue_specification = QuorumQueueSpecification(
name=queue_name,
queue_type=QueueType.quorum,
)
management.declare_queue(queue_specification)

Expand All @@ -122,16 +119,15 @@ def test_queue_precondition_fail() -> None:
queue_name = "test-queue_precondition_fail"
management = connection.management()

queue_specification = QueueSpecification(
name=queue_name, queue_type=QueueType.quorum, is_auto_delete=False
queue_specification = QuorumQueueSpecification(
name=queue_name, is_auto_delete=False
)
management.declare_queue(queue_specification)

management.declare_queue(queue_specification)

queue_specification = QueueSpecification(
queue_specification = QuorumQueueSpecification(
name=queue_name,
queue_type=QueueType.quorum,
is_auto_delete=True,
)

Expand All @@ -152,7 +148,7 @@ def test_declare_classic_queue() -> None:
queue_name = "test-declare_classic_queue"
management = connection.management()

queue_specification = QueueSpecification(
queue_specification = QuorumQueueSpecification(
name=queue_name,
queue_type=QueueType.classic,
is_auto_delete=False,
Expand All @@ -165,14 +161,14 @@ def test_declare_classic_queue() -> None:
management.delete_queue(queue_name)


def test_declare_queue_with_args() -> None:
def test_declare_classic_queue_with_args() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

queue_name = "test-queue_with_args"
management = connection.management()

queue_specification = QueueSpecification(
queue_specification = ClassicQueueSpecification(
name=queue_name,
queue_type=QueueType.classic,
is_auto_delete=False,
Expand Down Expand Up @@ -201,3 +197,36 @@ def test_declare_queue_with_args() -> None:
)

management.delete_queue(queue_name)


def test_declare_stream_with_args() -> None:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

stream_name = "test-stream_with_args"
management = connection.management()

stream_specification = StreamSpecification(
name=stream_name,
max_len_bytes=1000000000,
max_time_retention=10000000,
max_segment_size_in_bytes=100000000,
filter_size=1000,
initial_group_size=3,
leader_locator="node1",
)

stream_info = management.declare_queue(stream_specification)

assert stream_specification.name == stream_info.name
assert stream_specification.max_len_bytes == stream_info.max_len_bytes
assert stream_specification.max_time_retention == stream_info.max_time_retention
assert (
stream_specification.max_segment_size_in_bytes
== stream_info.max_segment_size_in_bytes
)
assert stream_specification.filter_size == stream_info.filter_size
assert stream_specification.initial_group_size == stream_info.initial_group_size
assert stream_specification.leader_locator == stream_info.leader_locator

management.delete_queue(stream_name)
7 changes: 2 additions & 5 deletions tests/test_publisher.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from rabbitmq_amqp_python_client import (
Connection,
Message,
QueueSpecification,
QueueType,
QuorumQueueSpecification,
)


Expand All @@ -13,9 +12,7 @@ def test_bind_exchange_to_queue() -> None:
queue_name = "test-queue"
management = connection.management()

management.declare_queue(
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
)
management.declare_queue(QuorumQueueSpecification(name=queue_name))

raised = False

Expand Down