Description
I'm using multiple Flask-SocketIO workers with a Redis Sentinel message queue configured through KombuManger as client_manager (successfully set up following instructions from miguelgrinberg/Flask-SocketIO#1979).
There is an issue with the current _listen()
implementation shown below from kombu_manager.py where a Sentinel failover is not properly detected when a Redis master instance goes down and a replica instance is promoted to new master.
def _listen(self):
reader_queue = self._queue()
retry_sleep = 1
while True:
try:
with self._connection() as connection:
with connection.SimpleQueue(reader_queue) as queue:
while True:
message = queue.get(block=True)
message.ack()
yield message.payload
retry_sleep = 1
except (OSError, kombu.exceptions.KombuError):
self._get_logger().error(
'Cannot receive from rabbitmq... '
'retrying in {} secs'.format(retry_sleep))
time.sleep(retry_sleep)
retry_sleep = min(retry_sleep * 2, 60)
The redis package raises the exception redis.exceptions.ConnectionError: Connection closed by server
in such a scenario as shown in the exception stack trace originating at message = queue.get(block=True)
. Since the except block is only handling OSError
and kombu.exceptions.KombuError
exceptions, this error is not detected resulting in loop termination.
In my local environment, I was able to resolve the issue by simply adding redis.exceptions.ConnectionError
to the except block but that might not be ideal as it also requires importing the redis-py package in kombu_manager.py. With this change, a Redis Sentinel setup works as a message queue for Flask-SocketIO and handles failover as expected since it detects the new master when connecting to the sentinel url again after the exception is caught. I tested this with a url of the form sentinel://:*@sentinel1:port/db;sentinel://:*@sentinel2:port/db
so it seems to work well with a configuration consisting of multiple Sentinel nodes and multiple Redis instances in a master-replica setup.
This change should be good enough for my use case but just wanted to share my findings here so we might have an official fix to python-socketio for Redis Sentinel failover support in KombuManager.