Skip to content

Commit

Permalink
feat: reduce latency to process messages (#161)
Browse files Browse the repository at this point in the history
Improve `message_bus.py` `_process_message` performance with a `pxd` file
  • Loading branch information
bdraco authored Nov 11, 2022
1 parent 9240bfd commit 113f0c9
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
1 change: 1 addition & 0 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def build(setup_kwargs):
[
"src/dbus_fast/aio/message_reader.py",
"src/dbus_fast/message.py",
"src/dbus_fast/message_bus.py",
"src/dbus_fast/signature.py",
"src/dbus_fast/unpack.py",
"src/dbus_fast/_private/marshaller.py",
Expand Down
29 changes: 29 additions & 0 deletions src/dbus_fast/message_bus.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import cython

from .message cimport Message


cdef object MessageType
cdef object DBusError
cdef object MessageFlag

cdef object MESSAGE_TYPE_CALL
cdef object MESSAGE_TYPE_SIGNAL

cdef class BaseMessageBus:

cdef public object unique_name
cdef public object _disconnected
cdef public object _user_disconnect
cdef public object _method_return_handlers
cdef public object _serial
cdef public cython.list _user_message_handlers
cdef public object _name_owners
cdef public object _bus_address
cdef public object _name_owner_match_rule
cdef public object _match_rules
cdef public object _high_level_client_initialized
cdef public object _ProxyObject
cdef public object _machine_id

cpdef _process_message(self, Message msg)
47 changes: 33 additions & 14 deletions src/dbus_fast/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from .signature import Variant
from .validators import assert_bus_name_valid, assert_object_path_valid

MESSAGE_TYPE_CALL = MessageType.METHOD_CALL
MESSAGE_TYPE_SIGNAL = MessageType.SIGNAL


class BaseMessageBus:
"""An abstract class to manage a connection to a DBus message bus.
Expand Down Expand Up @@ -56,6 +59,22 @@ class BaseMessageBus:
:vartype connected: bool
"""

__slots__ = (
"unique_name",
"_disconnected",
"_user_disconnect",
"_method_return_handlers",
"_serial",
"_user_message_handlers",
"_name_owners",
"_bus_address",
"_name_owner_match_rule",
"_match_rules",
"_high_level_client_initialized",
"_ProxyObject",
"_machine_id",
)

def __init__(
self,
bus_address: Optional[str] = None,
Expand Down Expand Up @@ -620,7 +639,7 @@ def _setup_socket(self) -> None:
if "path" in options:
filename = options["path"]
elif "abstract" in options:
filename = f'\0{options["abstract"]}'
filename = b"\0" + options["abstract"].encode()
else:
raise InvalidAddressError(
"got unix transport with unknown path specifier"
Expand Down Expand Up @@ -770,9 +789,8 @@ def send_error(self, exc: Exception) -> None:

return SendReply()

def _process_message(self, msg: Message) -> None:
def _process_message(self, msg) -> None:
handled = False

for user_handler in self._user_message_handlers:
try:
result = user_handler(msg)
Expand All @@ -782,7 +800,7 @@ def _process_message(self, msg: Message) -> None:
handled = True
break
except DBusError as e:
if msg.message_type == MessageType.METHOD_CALL:
if msg.message_type is MESSAGE_TYPE_CALL:
self.send(e._as_message(msg))
handled = True
break
Expand All @@ -794,7 +812,7 @@ def _process_message(self, msg: Message) -> None:
logging.error(
f"A message handler raised an exception: {e}.\n{traceback.format_exc()}"
)
if msg.message_type == MessageType.METHOD_CALL:
if msg.message_type is MESSAGE_TYPE_CALL:
self.send(
Message.new_error(
msg,
Expand All @@ -805,7 +823,7 @@ def _process_message(self, msg: Message) -> None:
handled = True
break

if msg.message_type == MessageType.SIGNAL:
if msg.message_type is MESSAGE_TYPE_SIGNAL:
if (
msg.member == "NameOwnerChanged"
and msg.sender == "org.freedesktop.DBus"
Expand All @@ -817,8 +835,9 @@ def _process_message(self, msg: Message) -> None:
self._name_owners[name] = new_owner
elif name in self._name_owners:
del self._name_owners[name]
return

elif msg.message_type == MessageType.METHOD_CALL:
if msg.message_type is MESSAGE_TYPE_CALL:
if not handled:
handler = self._find_message_handler(msg)

Expand All @@ -835,14 +854,14 @@ def _process_message(self, msg: Message) -> None:
f'{msg.interface}.{msg.member} with signature "{msg.signature}" could not be found',
)
)
return

else:
# An ERROR or a METHOD_RETURN
if msg.reply_serial in self._method_return_handlers:
if not handled:
return_handler = self._method_return_handlers[msg.reply_serial]
return_handler(msg, None)
del self._method_return_handlers[msg.reply_serial]
# An ERROR or a METHOD_RETURN
if msg.reply_serial in self._method_return_handlers:
if not handled:
return_handler = self._method_return_handlers[msg.reply_serial]
return_handler(msg, None)
del self._method_return_handlers[msg.reply_serial]

def _make_method_handler(
self, interface: ServiceInterface, method: _Method
Expand Down

0 comments on commit 113f0c9

Please sign in to comment.