Skip to content

Commit b4c51ab

Browse files
authored
Merge pull request #19 from elarahq/retry_on_close
added retry
2 parents f9f193a + 42cad64 commit b4c51ab

File tree

6 files changed

+35
-2
lines changed

6 files changed

+35
-2
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,4 @@ target/
5959
# Other
6060
.DS_Store
6161
._.DS_Store
62+
*.swp

README.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ A rabbitmq publisher and receiver that implements a safe and reliable rabbitmq
66

77
Changelog
88
=========
9+
2.0.2
10+
-----
11+
12+
* Added retry on connection lost.
913

1014
2.0.1
1115
-----

rmq/rmqproducer/rabbitmq_producer.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import signal
44
import logging
55
from connection import RMQConnectionPool
6+
from random import randint
67

78

89
class Publisher(object):
@@ -19,6 +20,7 @@ class Publisher(object):
1920
2021
"""
2122

23+
2224
def __init__(self, amqp_url, exchange, **kwargs):
2325
"""Create a new instance of the Publisher class, passing in the
2426
parameters used to connect to RabbitMQ. It established connection and
@@ -100,6 +102,7 @@ def connect(self):
100102
if connection is None:
101103
connection = pika.SelectConnection(pika.URLParameters(self._url),
102104
self.on_connection_open,
105+
self.on_connection_error,
103106
stop_ioloop_on_close=False)
104107
self._connection = connection
105108
else:
@@ -122,6 +125,12 @@ def on_connection_open(self, unused_connection):
122125
self._LOGGER.info('Connection opened')
123126
self.add_on_connection_close_callback()
124127
self.open_channel()
128+
129+
130+
def on_connection_error(self, connection, error):
131+
self._LOGGER.warning("Publisher: Connection lost retrying in {time}".format(time=5))
132+
connection.add_timeout(5, self.reconnect)
133+
125134

126135
def add_on_connection_close_callback(self):
127136
"""This method adds an on close callback that will be invoked by pika
@@ -149,7 +158,7 @@ def on_connection_closed(self, connection, reply_code, reply_text):
149158
else:
150159
self._LOGGER.warning('Connection closed, reopening in %d seconds: (%s) %s',
151160
self.reconnect_time, reply_code, reply_text)
152-
self._connection.add_timeout(self.reconnect_time, self.reconnect)
161+
self._connection.add_timeout(5, self.reconnect)
153162

154163
def reconnect(self):
155164
"""Will be invoked by the IOLoop timer if the connection is

rmq/rmqreceiver/rabbitmq_receiver.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import pika
33
import signal
44
import logging
5+
from random import randint
56

67

78
class Receiver(object):
@@ -94,6 +95,7 @@ def connect(self):
9495
self._LOGGER.info('Connecting to %s', self._url)
9596
return pika.SelectConnection(pika.URLParameters(self._url),
9697
self.on_connection_open,
98+
self.on_connection_error,
9799
stop_ioloop_on_close=False)
98100

99101
def on_connection_open(self, unused_connection):
@@ -119,6 +121,13 @@ def add_on_connection_close_callback(self):
119121
self._LOGGER.info('Adding connection close callback')
120122
self._connection.add_on_close_callback(self.on_connection_closed)
121123

124+
125+
126+
127+
def on_connection_error(self, connection, error):
128+
self._LOGGER.warning("Connection failed.Retrying in next 5 seconds")
129+
connection.add_timeout(5, self.reconnect)
130+
122131
def on_connection_closed(self, connection, reply_code, reply_text):
123132
"""Invoked by pika when the connection to RabbitMQ is closed unexpectedly.
124133

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ def readme():
66
return f.read()
77

88
setup(name='rmq-pub-sub',
9-
version='2.0.1',
9+
version='2.0.2',
1010
description='Rabbitmq Receiver and Publisher',
1111
url='https://github.com/loconsolutions/python-rabbitmq-pubsub',
1212
author='Rahul Kumar and Bipul Karnani',

test.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from rmq import Receiver, Publisher
2+
3+
4+
def callback(self, ch, method,properties, body):
5+
print("message received")
6+
7+
8+
publisher = Publisher('amqp://guest:guest@10.1.6.46:5672/%2F/?socket_timeout=10', 'analytics_new')
9+
receiver = Receiver(callback, 'amqp://guest:guest@10.1.6.46:5672/%2F/?socket_timeout=10', 'analytics_new', binding_keys=["#"], queue='dsl_profiling_service')
10+
receiver.run()

0 commit comments

Comments
 (0)