Skip to content

Commit

Permalink
Merge pull request #1178 from BlackOrder/fix-1142-timer
Browse files Browse the repository at this point in the history
fix 1142
  • Loading branch information
farirat authored Mar 27, 2024
2 parents dd0562b + 28ec80e commit c45c688
Showing 1 changed file with 43 additions and 12 deletions.
55 changes: 43 additions & 12 deletions jasmin/managers/dlr.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from twisted.internet import defer
from twisted.internet import reactor
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from txamqp.queue import Closed
from txredisapi import ConnectionError
from smpp.pdu.pdu_types import RegisteredDeliveryReceipt
Expand Down Expand Up @@ -71,6 +72,22 @@ def subscribe(self):
yield self.amqpBroker.chan.basic_consume(queue=queueName, no_ack=False, consumer_tag=consumerTag)
self.amqpBroker.client.queue(consumerTag).addCallback(self.setup_callbacks)

def clearRequeueTimer(self, msgid):
if msgid in self.requeue_timers:
try:
self.requeue_timers[msgid].cancel()
except AlreadyCalled or AlreadyCancelled:
pass
del self.requeue_timers[msgid]

def clearRequeueTimers(self):
for msgid, timer in list(self.requeue_timers.items()):
try:
timer.cancel()
except AlreadyCalled or AlreadyCancelled:
pass
del self.requeue_timers[msgid]

@defer.inlineCallbacks
def rejectAndRequeueMessage(self, message, delay=True):
msgid = message.content.properties['message-id']
Expand All @@ -79,21 +96,25 @@ def rejectAndRequeueMessage(self, message, delay=True):
self.log.debug("Requeuing Content[%s] with delay: %s seconds",
msgid, self.config.dlr_lookup_retry_delay)

# Requeue the message with a delay
timer = reactor.callLater(self.config.dlr_lookup_retry_delay,
self.rejectMessage,
message=message,
requeue=1)

# If any, clear timer before setting a new one
# If any, reset timer
if msgid in self.requeue_timers:
timer = self.requeue_timers[msgid]
if timer.active():
timer.cancel()
del self.requeue_timers[msgid]
try:
timer.reset(self.config.dlr_lookup_retry_delay)
except AlreadyCalled:
# noting to do here, already rejected
pass
except AlreadyCancelled:
# noting to do here, already rejected
pass
else:
# Set new timer
timer = reactor.callLater(self.config.dlr_lookup_retry_delay,
self.rejectMessage,
message=message,
requeue=1)
self.requeue_timers[msgid] = timer

# Set new timer
self.requeue_timers[msgid] = timer
defer.returnValue(timer)
else:
self.log.debug("Requeuing Content[%s] without delay", msgid)
Expand All @@ -104,6 +125,11 @@ def rejectMessage(self, message, requeue=0):
if requeue == 0 and message.content.properties['message-id'] in self.lookup_retrials:
# Remove retrial tracker
del self.lookup_retrials[message.content.properties['message-id']]

self.clearRequeueTimer(message.content.properties['message-id'])
if not self.amqpBroker.connected:
self.log.error("Cannot reject message, AMQP Broker is not connected !")
defer.returnValue(False)

yield self.amqpBroker.chan.basic_reject(delivery_tag=message.delivery_tag, requeue=requeue)

Expand All @@ -113,6 +139,11 @@ def ackMessage(self, message):
if message.content.properties['message-id'] in self.lookup_retrials:
# Remove retrial tracker
del self.lookup_retrials[message.content.properties['message-id']]

self.clearRequeueTimer(message.content.properties['message-id'])
if not self.amqpBroker.connected:
self.log.error("Cannot ack message, AMQP Broker is not connected !")
defer.returnValue(False)

yield self.amqpBroker.chan.basic_ack(message.delivery_tag)

Expand Down

0 comments on commit c45c688

Please sign in to comment.