Open
Description
by #61, and changed bus_type
to BusType.SESSION
, but canot recv any message
import asyncio
import signal
import os
from dbus_next.aio import MessageBus
from dbus_next.message import Message
from dbus_next.constants import BusType, MessageType
from dbus_next.signature import Variant
def dbus_path_to_name(path):
name = os.path.basename(path)
name = name.replace('_40', '@')
name = name.replace('_2e', '.')
name = name.replace('_5f', '_')
name = name.replace('_2d', '-')
name = name.replace('_5c', '\\')
return name
def message_handler(msg: Message ):
name: str = dbus_path_to_name(msg.path)
properties: dict[str, Variant] = msg.body[1]
if 'ActiveState' not in properties:
return False
print(f"'unit: {name}, "
f"ActiveState: {properties['ActiveState'].value}, "
f"SubState:{properties['SubState'].value} ")
return True
async def main():
stop_event = asyncio.Event()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, stop_event.set)
loop.add_signal_handler(signal.SIGTERM, stop_event.set)
bus = await MessageBus(bus_type=BusType.SESSION).connect()
reply = await bus.call(Message(
destination='org.freedesktop.DBus',
path='/org/freedesktop/DBus',
interface='org.freedesktop.DBus',
member='AddMatch',
signature='s',
body=["path_namespace='/org/freedesktop/systemd1/unit',type='signal',interface='org.freedesktop.DBus.Properties'"],
serial=bus.next_serial()
))
assert reply.message_type == MessageType.METHOD_RETURN
bus.add_message_handler(message_handler)
await stop_event.wait()
asyncio.run(main())
Metadata
Metadata
Assignees
Labels
No labels