Skip to content

Commit

Permalink
Merge pull request #34 from dlecocq/dan/connection-closed-exception
Browse files Browse the repository at this point in the history
Connection Closed Exception
  • Loading branch information
dlecocq committed Jul 25, 2014
2 parents 855fbe1 + 1eeeae2 commit 03e77e8
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 8 deletions.
13 changes: 10 additions & 3 deletions nsq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from contextlib import contextmanager
import random
from select import select
import select
import socket
import time
import threading
Expand Down Expand Up @@ -182,8 +182,15 @@ def read(self):
# Not all connections need to be written to, so we'll only concern
# ourselves with those that require writes
writes = [c for c in connections if c.pending()]
readable, writable, exceptable = select(
connections, writes, connections, self._timeout)
try:
readable, writable, exceptable = select.select(
connections, writes, connections, self._timeout)
except exceptions.ConnectionClosedException:
logger.exception('Tried selecting on closed client')
return []
except select.error:
logger.exception('Error running select')
return []

# If we returned because the timeout interval passed, log it and return
if not (readable or writable or exceptable):
Expand Down
6 changes: 4 additions & 2 deletions nsq/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from . import util
from . import json
from . import __version__
from .exceptions import UnsupportedException
from .exceptions import UnsupportedException, ConnectionClosedException
from .sockets import TLSSocket, SnappySocket, DeflateSocket
from .response import Response, Message

Expand Down Expand Up @@ -208,7 +208,9 @@ def setblocking(self, blocking):
def fileno(self):
'''Returns the socket's fileno. This allows us to select on this'''
for sock in self.socket():
return sock.fileno()
if sock:
return sock.fileno()
raise ConnectionClosedException()

def pending(self):
'''All of the messages waiting to be sent'''
Expand Down
4 changes: 4 additions & 0 deletions nsq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ class NSQException(StandardError):
'''Base class for all exceptions in this library'''


class ConnectionClosedException(NSQException):
'''Trying to use a closed connection as if it's alive'''


class UnsupportedException(NSQException):
'''When a requested feature cannot be used'''

Expand Down
23 changes: 20 additions & 3 deletions test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from common import HttpClientIntegrationTest, MockedConnectionTest
from contextlib import contextmanager
import errno
import select
import socket


Expand Down Expand Up @@ -61,6 +63,21 @@ def test_conection_checker(self):
self.assertTrue(checker.is_alive())
self.assertFalse(checker.is_alive())

def test_read_closed(self):
'''Recovers from reading on a closed connection'''
conn = self.client.connections()[0]
with mock.patch.object(conn, 'alive', return_value=True):
with mock.patch.object(conn, '_socket', None):
# This test passes if no exception in raised
self.client.read()

def test_read_select_err(self):
'''Recovers from select errors'''
with mock.patch('nsq.client.select.select') as mock_select:
mock_select.side_effect = select.error(errno.EBADF)
# This test passes if no exception is raised
self.client.read()


class TestClientLookupd(HttpClientIntegrationTest):
'''Test our client class'''
Expand Down Expand Up @@ -119,21 +136,21 @@ class TestClientMultiple(MockedConnectionTest):
def readable(self, connections):
'''With all the connections readable'''
value = (connections, [], [])
with mock.patch.object(client, 'select', return_value=value):
with mock.patch('nsq.client.select.select', return_value=value):
yield

@contextmanager
def writable(self, connections):
'''With all the connections writable'''
value = ([], connections, [])
with mock.patch.object(client, 'select', return_value=value):
with mock.patch('nsq.client.select.select', return_value=value):
yield

@contextmanager
def exceptable(self, connections):
'''With all the connections exceptable'''
value = ([], [], connections)
with mock.patch.object(client, 'select', return_value=value):
with mock.patch('nsq.client.select.select', return_value=value):
yield

def test_multi_read(self):
Expand Down
6 changes: 6 additions & 0 deletions test/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ def test_fileno(self):
self.assertEqual(
self.connection.fileno(), self.socket.fileno())

def test_fileno_closed(self):
'''Raises an exception if the connection's closed'''
with mock.patch.object(self.connection, '_socket', None):
self.assertRaises(exceptions.ConnectionClosedException,
self.connection.fileno)

def test_str_alive(self):
'''Sane str representation for an alive connection'''
with mock.patch.object(self.connection, 'alive', return_value=True):
Expand Down

0 comments on commit 03e77e8

Please sign in to comment.