diff --git a/jasmin/managers/dlr.py b/jasmin/managers/dlr.py index a8561e91..6ea2ff8c 100644 --- a/jasmin/managers/dlr.py +++ b/jasmin/managers/dlr.py @@ -4,6 +4,7 @@ from twisted.internet import defer from twisted.internet import reactor +from twisted.internet.error import AlreadyCalled, AlreadyCancelled from txamqp.queue import Closed from txredisapi import ConnectionError from smpp.pdu.pdu_types import RegisteredDeliveryReceipt @@ -71,6 +72,22 @@ def subscribe(self): yield self.amqpBroker.chan.basic_consume(queue=queueName, no_ack=False, consumer_tag=consumerTag) self.amqpBroker.client.queue(consumerTag).addCallback(self.setup_callbacks) + def clearRequeueTimer(self, msgid): + if msgid in self.requeue_timers: + try: + self.requeue_timers[msgid].cancel() + except AlreadyCalled or AlreadyCancelled: + pass + del self.requeue_timers[msgid] + + def clearRequeueTimers(self): + for msgid, timer in list(self.requeue_timers.items()): + try: + timer.cancel() + except AlreadyCalled or AlreadyCancelled: + pass + del self.requeue_timers[msgid] + @defer.inlineCallbacks def rejectAndRequeueMessage(self, message, delay=True): msgid = message.content.properties['message-id'] @@ -79,21 +96,25 @@ def rejectAndRequeueMessage(self, message, delay=True): self.log.debug("Requeuing Content[%s] with delay: %s seconds", msgid, self.config.dlr_lookup_retry_delay) - # Requeue the message with a delay - timer = reactor.callLater(self.config.dlr_lookup_retry_delay, - self.rejectMessage, - message=message, - requeue=1) - - # If any, clear timer before setting a new one + # If any, reset timer if msgid in self.requeue_timers: timer = self.requeue_timers[msgid] - if timer.active(): - timer.cancel() - del self.requeue_timers[msgid] + try: + timer.reset(self.config.dlr_lookup_retry_delay) + except AlreadyCalled: + # noting to do here, already rejected + pass + except AlreadyCancelled: + # noting to do here, already rejected + pass + else: + # Set new timer + timer = reactor.callLater(self.config.dlr_lookup_retry_delay, + self.rejectMessage, + message=message, + requeue=1) + self.requeue_timers[msgid] = timer - # Set new timer - self.requeue_timers[msgid] = timer defer.returnValue(timer) else: self.log.debug("Requeuing Content[%s] without delay", msgid) @@ -104,6 +125,11 @@ def rejectMessage(self, message, requeue=0): if requeue == 0 and message.content.properties['message-id'] in self.lookup_retrials: # Remove retrial tracker del self.lookup_retrials[message.content.properties['message-id']] + + self.clearRequeueTimer(message.content.properties['message-id']) + if not self.amqpBroker.connected: + self.log.error("Cannot reject message, AMQP Broker is not connected !") + defer.returnValue(False) yield self.amqpBroker.chan.basic_reject(delivery_tag=message.delivery_tag, requeue=requeue) @@ -113,6 +139,11 @@ def ackMessage(self, message): if message.content.properties['message-id'] in self.lookup_retrials: # Remove retrial tracker del self.lookup_retrials[message.content.properties['message-id']] + + self.clearRequeueTimer(message.content.properties['message-id']) + if not self.amqpBroker.connected: + self.log.error("Cannot ack message, AMQP Broker is not connected !") + defer.returnValue(False) yield self.amqpBroker.chan.basic_ack(message.delivery_tag)