diff --git a/src/dbus_fast/aio/message_reader.pxd b/src/dbus_fast/aio/message_reader.pxd index e76627b2..c570fabc 100644 --- a/src/dbus_fast/aio/message_reader.pxd +++ b/src/dbus_fast/aio/message_reader.pxd @@ -3,3 +3,11 @@ import cython from .._private.unmarshaller cimport Unmarshaller + + +cpdef _message_reader( + Unmarshaller unmarshaller, + object process, + object finalize, + bint negotiate_unix_fd +) diff --git a/src/dbus_fast/aio/message_reader.py b/src/dbus_fast/aio/message_reader.py index 38460beb..93222f9c 100644 --- a/src/dbus_fast/aio/message_reader.py +++ b/src/dbus_fast/aio/message_reader.py @@ -1,11 +1,39 @@ import logging import socket +from functools import partial from typing import Callable, Optional from .._private.unmarshaller import Unmarshaller from ..message import Message +def _message_reader( + unmarshaller: Unmarshaller, + process: Callable[[Message], None], + finalize: Callable[[Optional[Exception]], None], + negotiate_unix_fd: bool, +) -> None: + """Reads messages from the unmarshaller and passes them to the process function.""" + try: + while True: + message = unmarshaller._unmarshall() + if message is None: + return + try: + process(message) + except Exception as e: + logging.error("Unexpected error processing message: %s", exc_info=True) + # If we are not negotiating unix fds, we can stop reading as soon as we have + # the buffer is empty as asyncio will call us again when there is more data. + if ( + not negotiate_unix_fd + and not unmarshaller._has_another_message_in_buffer() + ): + return + except Exception as e: + finalize(e) + + def build_message_reader( sock: Optional[socket.socket], process: Callable[[Message], None], @@ -14,28 +42,4 @@ def build_message_reader( ) -> Callable[[], None]: """Build a callable that reads messages from the unmarshaller and passes them to the process function.""" unmarshaller = Unmarshaller(None, sock, negotiate_unix_fd) - - def _message_reader() -> None: - """Reads messages from the unmarshaller and passes them to the process function.""" - try: - while True: - message = unmarshaller._unmarshall() - if message is None: - return - try: - process(message) - except Exception as e: - logging.error( - "Unexpected error processing message: %s", exc_info=True - ) - # If we are not negotiating unix fds, we can stop reading as soon as we have - # the buffer is empty as asyncio will call us again when there is more data. - if ( - not negotiate_unix_fd - and not unmarshaller._has_another_message_in_buffer() - ): - return - except Exception as e: - finalize(e) - - return _message_reader + return partial(_message_reader, unmarshaller, process, finalize, negotiate_unix_fd)