-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Description
Hi again!
I just updated redis-py from 5.2.1 to 5.3.0 and I noticed that while adding the Dispatchers there was deadlock introduced.
Summary
When using a client with Retries enabled and subscribing to a pattern using psubscribe, a deadlock can occur under certain conditions:
- If a
punsubscribeis attempted and a disconnection (such as a broken pipe) happens during this process, a reconnection is triggered (via thePubSub._executeerror handling). - During reconnection, the client tries to re-subscribe to the previously subscribed patterns, but this process can result in a deadlock.
Full explanation
There was a new lock introduced before each PubSub command execution on commit 40e5fc1, which is part of the 5.3.0 release:
Lines 874 to 875 in 7130e1a
| with self._lock: | |
| self._execute(connection, connection.send_command, *args, **kwargs) |
When a command execution fails, if Retries are enabled, there will be a reconnection attempt:
Lines 895 to 916 in 7130e1a
| def _reconnect(self, conn) -> None: | |
| """ | |
| The supported exceptions are already checked in the | |
| retry object so we don't need to do it here. | |
| In this error handler we are trying to reconnect to the server. | |
| """ | |
| conn.disconnect() | |
| conn.connect() | |
| def _execute(self, conn, command, *args, **kwargs): | |
| """ | |
| Connect manually upon disconnection. If the Redis server is down, | |
| this will fail and raise a ConnectionError as desired. | |
| After reconnection, the ``on_connect`` callback should have been | |
| called by the # connection to resubscribe us to any channels and | |
| patterns we were previously listening to | |
| """ | |
| return conn.retry.call_with_retry( | |
| lambda: command(*args, **kwargs), | |
| lambda _: self._reconnect(conn), | |
| ) |
However, when executing the connect we get to the final lines where the resubscriptions callbacks are triggered for pubsub:
Lines 406 to 413 in 7130e1a
| # run any user callbacks. right now the only internal callback | |
| # is for pubsub channel/pattern resubscription | |
| # first, remove any dead weakrefs | |
| self._connect_callbacks = [ref for ref in self._connect_callbacks if ref()] | |
| for ref in self._connect_callbacks: | |
| callback = ref() | |
| if callback: | |
| callback(self) |
This will hit the same lock again a cause a deadlock.
Let me know if you need any other information. 🙂