diff --git a/docs/api/client.rst b/docs/api/client.rst index 8efe9555..51335e6d 100644 --- a/docs/api/client.rst +++ b/docs/api/client.rst @@ -30,6 +30,27 @@ Public API A :class:`~kazoo.protocol.states.KazooState` attribute indicating the current higher-level connection state. + .. note:: + + Up to version 2.6.1, requests could only be submitted + in the CONNECTED state. Requests submitted while + SUSPENDED would immediately raise a + :exc:`~kazoo.exceptions.SessionExpiredError`. This + was problematic, as sessions are usually recovered on + reconnect. + + Kazoo now simply queues requests submitted in the + SUSPENDED state, expecting a recovery. This matches + the behavior of the Java and C clients. + + Requests submitted in a LOST state still fail + immediately with the corresponding exception. + + See: + + * https://github.com/python-zk/kazoo/issues/374 and + * https://github.com/python-zk/kazoo/pull/570 + .. autoclass:: TransactionRequest :members: :member-order: bysource diff --git a/kazoo/client.py b/kazoo/client.py index a129fc51..a5bdae42 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -584,11 +584,11 @@ def _safe_close(self): "and wouldn't close after %s seconds" % timeout) def _call(self, request, async_object): - """Ensure there's an active connection and put the request in - the queue if there is. + """Ensure the client is in CONNECTED or SUSPENDED state and put the + request in the queue if it is. Returns False if the call short circuits due to AUTH_FAILED, - CLOSED, EXPIRED_SESSION or CONNECTING state. + CLOSED, or EXPIRED_SESSION state. """ @@ -599,8 +599,7 @@ def _call(self, request, async_object): async_object.set_exception(ConnectionClosedError( "Connection has been closed")) return False - elif self._state in (KeeperState.EXPIRED_SESSION, - KeeperState.CONNECTING): + elif self._state == KeeperState.EXPIRED_SESSION: async_object.set_exception(SessionExpiredError()) return False diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index fb6d2d93..32cc0d1e 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -8,7 +8,7 @@ import mock from mock import patch from nose import SkipTest -from nose.tools import eq_ +from nose.tools import eq_, ok_, assert_not_equal from nose.tools import raises from kazoo.testing import KazooTestCase @@ -492,9 +492,6 @@ def test_create_on_broken_connection(self): self.assertRaises(AuthFailedError, client.create, '/closedpath', b'bar') - client._state = KeeperState.CONNECTING - self.assertRaises(SessionExpiredError, client.create, - '/closedpath', b'bar') client.stop() client.close() @@ -982,6 +979,107 @@ def test_update_host_list(self): finally: self.cluster[0].run() + # utility for test_request_queuing* + def _make_request_queuing_client(self): + from kazoo.client import KazooClient + server = self.cluster[0] + handler = self._makeOne() + # create a client with only one server in its list, and + # infinite retries + client = KazooClient( + hosts=server.address + self.client.chroot, + handler=handler, + connection_retry=dict( + max_tries=-1, + delay=0.1, + backoff=1, + max_jitter=0.0, + sleep_func=handler.sleep_func + ) + ) + + return client, server + + # utility for test_request_queuing* + def _request_queuing_common(self, client, server, path, expire_session): + ev_suspended = client.handler.event_object() + ev_connected = client.handler.event_object() + + def listener(state): + if state == KazooState.SUSPENDED: + ev_suspended.set() + elif state == KazooState.CONNECTED: + ev_connected.set() + client.add_listener(listener) + + # wait for the client to connect + client.start() + + try: + # force the client to suspend + server.stop() + + ev_suspended.wait(5) + ok_(ev_suspended.is_set()) + ev_connected.clear() + + # submit a request, expecting it to be queued + result = client.create_async(path) + assert_not_equal(len(client._queue), 0) + eq_(result.ready(), False) + eq_(client.state, KazooState.SUSPENDED) + + # optionally cause a SessionExpiredError to occur by + # mangling the first byte of the session password. + if expire_session: + b0 = b'\x00' + if client._session_passwd[0] == 0: + b0 = b'\xff' + client._session_passwd = b0 + client._session_passwd[1:] + finally: + server.run() + + # wait for the client to reconnect (either with a recovered + # session, or with a new one if expire_session was set) + ev_connected.wait(5) + ok_(ev_connected.is_set()) + + return result + + def test_request_queuing_session_recovered(self): + path = "/" + uuid.uuid4().hex + client, server = self._make_request_queuing_client() + + try: + result = self._request_queuing_common( + client=client, + server=server, + path=path, + expire_session=False + ) + + eq_(result.get(), path) + assert_not_equal(client.exists(path), None) + finally: + client.stop() + + def test_request_queuing_session_expired(self): + path = "/" + uuid.uuid4().hex + client, server = self._make_request_queuing_client() + + try: + result = self._request_queuing_common( + client=client, + server=server, + path=path, + expire_session=True + ) + + eq_(len(client._queue), 0) + self.assertRaises(SessionExpiredError, result.get) + finally: + client.stop() + dummy_dict = { 'aversion': 1, 'ctime': 0, 'cversion': 1,