Skip to content

Commit

Permalink
redeclare queues that disappear in the middle of an enqueue op (#556)
Browse files Browse the repository at this point in the history
Co-authored-by: Bogdan Popa <bogdan@defn.io>
  • Loading branch information
JetDrag and Bogdanp authored Apr 29, 2024
1 parent 5bb9a22 commit 4d0cc07
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
15 changes: 11 additions & 4 deletions dramatiq/brokers/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ def declare_queue(self, queue_name, *, ensure=False):
Parameters:
queue_name(str): The name of the new queue.
ensure(bool): When True, the queue is created immediately on
the server.
ensure(bool): When True, the queue is created on the server,
if necessary.
Raises:
ConnectionClosed: When ensure=True if the underlying channel
Expand Down Expand Up @@ -308,8 +308,8 @@ def enqueue(self, message, *, delay=None):
ConnectionClosed: If the underlying channel or connection
has been closed.
"""
queue_name = message.queue_name
self.declare_queue(queue_name, ensure=True)
canonical_queue_name = message.queue_name
queue_name = canonical_queue_name

if delay is not None:
queue_name = dq_name(queue_name)
Expand All @@ -324,6 +324,7 @@ def enqueue(self, message, *, delay=None):
attempts = 1
while True:
try:
self.declare_queue(canonical_queue_name, ensure=True)
self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name)
self.emit_before("enqueue", message, delay)
self.channel.basic_publish(
Expand All @@ -344,6 +345,12 @@ def enqueue(self, message, *, delay=None):
# next caller/attempt may initiate new ones of each.
del self.connection

# If the queue disappears, remove it from the known set
# so that it can be redeclared on retry or the next time
# a message is enqueued.
if getattr(e, "reply_code", None) == 404:
self.queues.remove(q_name(queue_name))

attempts += 1
if attempts > MAX_ENQUEUE_ATTEMPTS:
raise ConnectionClosed(e) from None
Expand Down
46 changes: 45 additions & 1 deletion tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest

import dramatiq
from dramatiq import Message, QueueJoinTimeout, Worker
from dramatiq import Message, Middleware, QueueJoinTimeout, Worker
from dramatiq.brokers.rabbitmq import RabbitmqBroker, URLRabbitmqBroker, _IgnoreScaryLogs
from dramatiq.common import current_millis

Expand Down Expand Up @@ -464,6 +464,50 @@ def do_work():
worker.stop()


def test_rabbitmq_broker_retries_declaring_queues_when_declared_queue_disappears(rabbitmq_broker):
executed = False

# Given that I have an actor on a flaky queue
flaky_queue_name = "flaky_queue"
rabbitmq_broker.channel.queue_delete(flaky_queue_name)

@dramatiq.actor(queue_name=flaky_queue_name)
def do_work():
nonlocal executed
executed = True

# When I start a server
worker = Worker(rabbitmq_broker, worker_threads=1)
worker.start()

declared_ev = Event()

class DeclaredMiddleware(Middleware):
def after_declare_queue(self, broker, queue_name):
if queue_name == flaky_queue_name:
declared_ev.set()

# I expect that queue to be declared
rabbitmq_broker.add_middleware(DeclaredMiddleware())
assert declared_ev.wait(timeout=5)

# If I delete the queue
rabbitmq_broker.channel.queue_delete(do_work.queue_name)
with pytest.raises(pika.exceptions.ChannelClosedByBroker):
rabbitmq_broker.channel.queue_declare(do_work.queue_name, passive=True)

# And I send that actor a message
do_work.send()
try:
rabbitmq_broker.join(do_work.queue_name, timeout=20000)
worker.join()
finally:
worker.stop()

# Then the queue should be declared and the message executed
assert executed


def test_rabbitmq_messages_that_failed_to_decode_are_rejected(rabbitmq_broker, rabbitmq_worker):
# Given that I have an Actor
@dramatiq.actor(max_retries=0)
Expand Down

0 comments on commit 4d0cc07

Please sign in to comment.