diff --git a/src/socketio/async_pubsub_manager.py b/src/socketio/async_pubsub_manager.py index c9d00b01..3e11f1ea 100644 --- a/src/socketio/async_pubsub_manager.py +++ b/src/socketio/async_pubsub_manager.py @@ -230,11 +230,13 @@ async def _thread(self): await self._handle_close_room(data) except asyncio.CancelledError: raise # let the outer try/except handle it - except: + except Exception: self.server.logger.exception( - 'Unknown error in pubsub listening task') + 'Handler error in pubsub listening thread') + self.server.logger.error('pubsub listen() exited unexpectedly') + break # loop should never exit except in unit tests! except asyncio.CancelledError: # pragma: no cover break - except: # pragma: no cover - import traceback - traceback.print_exc() + except Exception: # pragma: no cover + self.server.logger.exception('Unexpected Error in pubsub ' + 'listening thread') diff --git a/src/socketio/pubsub_manager.py b/src/socketio/pubsub_manager.py index 62eb3369..5ca7619c 100644 --- a/src/socketio/pubsub_manager.py +++ b/src/socketio/pubsub_manager.py @@ -188,38 +188,45 @@ def _handle_close_room(self, message): namespace=message.get('namespace')) def _thread(self): - for message in self._listen(): - data = None - if isinstance(message, dict): - data = message - else: - if isinstance(message, bytes): # pragma: no cover - try: - data = pickle.loads(message) - except: - pass - if data is None: - try: - data = json.loads(message) - except: - pass - if data and 'method' in data: - self._get_logger().debug('pubsub message: {}'.format( - data['method'])) - try: - if data['method'] == 'callback': - self._handle_callback(data) - elif data.get('host_id') != self.host_id: - if data['method'] == 'emit': - self._handle_emit(data) - elif data['method'] == 'disconnect': - self._handle_disconnect(data) - elif data['method'] == 'enter_room': - self._handle_enter_room(data) - elif data['method'] == 'leave_room': - self._handle_leave_room(data) - elif data['method'] == 'close_room': - self._handle_close_room(data) - except: - self.server.logger.exception( - 'Unknown error in pubsub listening thread') + while True: + try: + for message in self._listen(): + data = None + if isinstance(message, dict): + data = message + else: + if isinstance(message, bytes): # pragma: no cover + try: + data = pickle.loads(message) + except: + pass + if data is None: + try: + data = json.loads(message) + except: + pass + if data and 'method' in data: + self._get_logger().debug('pubsub message: {}'.format( + data['method'])) + try: + if data['method'] == 'callback': + self._handle_callback(data) + elif data.get('host_id') != self.host_id: + if data['method'] == 'emit': + self._handle_emit(data) + elif data['method'] == 'disconnect': + self._handle_disconnect(data) + elif data['method'] == 'enter_room': + self._handle_enter_room(data) + elif data['method'] == 'leave_room': + self._handle_leave_room(data) + elif data['method'] == 'close_room': + self._handle_close_room(data) + except Exception: + self.server.logger.exception( + 'Handler error in pubsub listening thread') + self.server.logger.error('pubsub listen() exited unexpectedly') + break # loop should never exit except in unit tests! + except Exception: # pragma: no cover + self.server.logger.exception('Unexpected Error in pubsub ' + 'listening thread') diff --git a/tests/async/test_pubsub_manager.py b/tests/async/test_pubsub_manager.py index 48a71aab..579452aa 100644 --- a/tests/async/test_pubsub_manager.py +++ b/tests/async/test_pubsub_manager.py @@ -541,7 +541,6 @@ async def messages(): 'host_id': host_id} yield pickle.dumps({'method': 'close_room', 'value': 'baz', 'host_id': host_id}) - raise asyncio.CancelledError() # force the thread to exit self.pm._listen = messages _run(self.pm._thread())