From 8b660fb19a071fbe44cf9ff5a212957ac5520ac2 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 28 Dec 2022 10:22:00 +0100 Subject: [PATCH] Fix possible endless wait in stop() after AUTH_FAILED error In case of AUTH_FAILED in the zk-loop thread it will call client._session_callback which will reset the queue. However another thread can add to this queue CloseInstance event, and if the _session_callback() will be called after CloseInstance was added to the queue, then stop() will never return (and zk-loop will endlessly spin). Here is how it looks like with addititional logging: 39: [ Thread-3 (zk_loop) ] INFO: client.py:568: _session_callback: Zookeeper session closed, state: AUTH_FAILED 39: [ MainThread ] Level 5: client.py:721: stop: Sending CloseInstance 39: [ Thread-3 (zk_loop) ] Level 5: client.py:403: _reset: Reseting the client 39: [ Thread-3 (zk_loop) ] Level 5: connection.py:625: _connect_attempt: Connecting 39: [ Thread-3 (zk_loop) ] Level 5: connection.py:625: _connect_attempt: Connecting You can find details in this gist [1]. [1]: https://gist.github.com/azat/bc7aaea1c32a4f1ea75ad646d26280e9 --- kazoo/protocol/connection.py | 2 +- kazoo/tests/test_client.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py index 6ed5cdea..9b5ce2fb 100644 --- a/kazoo/protocol/connection.py +++ b/kazoo/protocol/connection.py @@ -619,7 +619,7 @@ def _connect_attempt(self, host, hostip, port, retry): self.ping_outstanding.clear() last_send = time.time() with self._socket_error_handling(): - while True: + while not self.client._stopped.is_set(): # Watch for something to read or send jitter_time = random.randint(1, 40) / 100.0 deadline = last_send + read_timeout / 2.0 - jitter_time diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index 73876b8b..d504bdb8 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -257,6 +257,8 @@ def test_async_auth_failure(self): with pytest.raises(AuthFailedError): client.add_auth("unknown-scheme", digest_auth) + client.stop() + def test_add_auth_on_reconnect(self): client = self._get_client() client.start()