Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions rb/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,11 @@ def join(self, timeout=None):
to be hit.
"""
remaining = timeout
failed = False

while self._cb_poll and (remaining is None or remaining > 0):
while (self._cb_poll and
(remaining is None or remaining > 0) and
not failed):
now = time.time()
rv = self._cb_poll.poll(remaining)
if remaining is not None:
Expand All @@ -411,16 +414,26 @@ def join(self, timeout=None):
elif event in ('read', 'close'):
try:
command_buffer.wait_for_responses(self)
finally:
self._release_command_buffer(command_buffer)
except Exception:
failed = True
self._release_command_buffer(command_buffer)

# If anything failed we want to cancel and release all command
# buffers
if failed:
self.cancel()
raise ConnectionError('Connection failure on join')

if self._cb_poll and timeout is not None:
raise TimeoutError('Did not receive all data in time.')

def cancel(self):
"""Cancels all outstanding requests."""
for command_buffer in self._cb_poll:
self._release_command_buffer(command_buffer)
try:
self._release_command_buffer(command_buffer)
except Exception:
pass


class FanoutClient(MappingClient):
Expand Down