diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py index fc3586c8..70de2411 100644 --- a/kazoo/protocol/connection.py +++ b/kazoo/protocol/connection.py @@ -561,7 +561,6 @@ def _connect_loop(self, retry): def _connect_attempt(self, host, hostip, port, retry): client = self.client KazooTimeoutError = self.handler.timeout_exception - close_connection = False self._socket = None @@ -582,13 +581,14 @@ def _connect_attempt(self, host, hostip, port, retry): connect_timeout = connect_timeout / 1000.0 retry.reset() self.ping_outstanding.clear() + last_send = time.time() with self._socket_error_handling(): - while not close_connection: + while True: # Watch for something to read or send - jitter_time = random.randint(0, 40) / 100.0 + jitter_time = random.randint(1, 40) / 100.0 + deadline = last_send + read_timeout / 2.0 - jitter_time # Ensure our timeout is positive - timeout = max([read_timeout / 2.0 - jitter_time, - jitter_time]) + timeout = max([deadline - time.time(), jitter_time]) s = self.handler.select([self._socket, self._read_sock], [], [], timeout)[0] @@ -597,12 +597,23 @@ def _connect_attempt(self, host, hostip, port, retry): self.ping_outstanding.clear() raise ConnectionDropped( "outstanding heartbeat ping not received") - self._send_ping(connect_timeout) - elif s[0] == self._socket: - response = self._read_socket(read_timeout) - close_connection = response == CLOSE_RESPONSE else: - self._send_request(read_timeout, connect_timeout) + if self._socket in s: + response = self._read_socket(read_timeout) + if response == CLOSE_RESPONSE: + break + # Check if any requests need sending before proceeding + # to process more responses. Otherwise the responses + # may choke out the requests. See PR#633. + if self._read_sock in s: + self._send_request(read_timeout, connect_timeout) + # Requests act as implicit pings. + last_send = time.time() + continue + + if time.time() >= deadline: + self._send_ping(connect_timeout) + last_send = time.time() self.logger.info('Closing connection to %s:%s', host, port) client._session_callback(KeeperState.CLOSED) return STOP_CONNECTING