Skip to content

Commit

Permalink
feat: small speed up to the aio message reader (#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Dec 4, 2023
1 parent 2103239 commit 8ee18a1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 25 deletions.
8 changes: 8 additions & 0 deletions src/dbus_fast/aio/message_reader.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@
import cython

from .._private.unmarshaller cimport Unmarshaller


cpdef _message_reader(
Unmarshaller unmarshaller,
object process,
object finalize,
bint negotiate_unix_fd
)
54 changes: 29 additions & 25 deletions src/dbus_fast/aio/message_reader.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,39 @@
import logging
import socket
from functools import partial
from typing import Callable, Optional

from .._private.unmarshaller import Unmarshaller
from ..message import Message


def _message_reader(
unmarshaller: Unmarshaller,
process: Callable[[Message], None],
finalize: Callable[[Optional[Exception]], None],
negotiate_unix_fd: bool,
) -> None:
"""Reads messages from the unmarshaller and passes them to the process function."""
try:
while True:
message = unmarshaller._unmarshall()
if message is None:
return
try:
process(message)
except Exception as e:
logging.error("Unexpected error processing message: %s", exc_info=True)
# If we are not negotiating unix fds, we can stop reading as soon as we have
# the buffer is empty as asyncio will call us again when there is more data.
if (
not negotiate_unix_fd
and not unmarshaller._has_another_message_in_buffer()
):
return
except Exception as e:
finalize(e)


def build_message_reader(
sock: Optional[socket.socket],
process: Callable[[Message], None],
Expand All @@ -14,28 +42,4 @@ def build_message_reader(
) -> Callable[[], None]:
"""Build a callable that reads messages from the unmarshaller and passes them to the process function."""
unmarshaller = Unmarshaller(None, sock, negotiate_unix_fd)

def _message_reader() -> None:
"""Reads messages from the unmarshaller and passes them to the process function."""
try:
while True:
message = unmarshaller._unmarshall()
if message is None:
return
try:
process(message)
except Exception as e:
logging.error(
"Unexpected error processing message: %s", exc_info=True
)
# If we are not negotiating unix fds, we can stop reading as soon as we have
# the buffer is empty as asyncio will call us again when there is more data.
if (
not negotiate_unix_fd
and not unmarshaller._has_another_message_in_buffer()
):
return
except Exception as e:
finalize(e)

return _message_reader
return partial(_message_reader, unmarshaller, process, finalize, negotiate_unix_fd)

0 comments on commit 8ee18a1

Please sign in to comment.