Skip to content

Problem: dbus call were not async #595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packaging/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ debian-package-code:
cp ../examples/instance_message_from_aleph.json ./aleph-vm/opt/aleph-vm/examples/instance_message_from_aleph.json
cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data
mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.4' 'jwskate==0.8.0' 'eth-account==0.9.0' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'superfluid==0.2.1' 'sqlalchemy[asyncio]' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12'
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.4' 'jwskate==0.8.0' 'eth-account==0.9.0' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'superfluid==0.2.1' 'sqlalchemy[asyncio]' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'dbus-fast==1.90.1'
python3 -m compileall ./aleph-vm/opt/aleph-vm/

debian-package-resources: firecracker-bins vmlinux download-ipfs-kubo
Expand Down
2 changes: 1 addition & 1 deletion packaging/aleph-vm/DEBIAN/control
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ Version: 0.1.8
Architecture: all
Maintainer: Aleph.im
Description: Aleph.im VM execution engine
Depends: python3,python3-pip,python3-aiohttp,python3-msgpack,python3-aiodns,python3-alembic,python3-sqlalchemy,python3-setproctitle,redis,python3-aioredis,python3-psutil,sudo,acl,curl,systemd-container,squashfs-tools,debootstrap,python3-packaging,python3-cpuinfo,python3-nftables,python3-jsonschema,cloud-image-utils,ndppd,python3-yaml,python3-dotenv,python3-schedule,qemu-system-x86,qemu-utils,python3-systemd,python3-dbus,btrfs-progs,nftables
Depends: python3,python3-pip,python3-aiohttp,python3-msgpack,python3-aiodns,python3-alembic,python3-sqlalchemy,python3-setproctitle,redis,python3-aioredis,python3-psutil,sudo,acl,curl,systemd-container,squashfs-tools,debootstrap,python3-packaging,python3-cpuinfo,python3-nftables,python3-jsonschema,cloud-image-utils,ndppd,python3-yaml,python3-dotenv,python3-schedule,qemu-system-x86,qemu-utils,python3-systemd,btrfs-progs,nftables
Section: aleph-im
Priority: Extra
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ dependencies = [
"packaging==23.2",
"jsonschema==4.19.1",
"qmp==0.0.1",
"dbus-python==1.3.2",
"systemd-python==235",
"dbus-fast==1.90.1",
"systemd-python==235",
"superfluid~=0.2.1",
"sqlalchemy[asyncio]>=2.0",
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async def create_a_vm(

# Start VM and snapshots automatically
if execution.persistent:
self.systemd_manager.enable_and_start(execution.controller_service)
await self.systemd_manager.enable_and_start(execution.controller_service)
await execution.wait_for_init()
if execution.is_program and execution.vm:
await execution.vm.load_configuration()
Expand Down Expand Up @@ -191,7 +191,7 @@ async def stop_vm(self, vm_hash: ItemHash) -> Optional[VmExecution]:
async def stop_persistent_execution(self, execution: VmExecution):
"""Stop persistent VMs in the pool."""
assert execution.persistent, "Execution isn't persistent"
self.systemd_manager.stop_and_disable(execution.controller_service)
await self.systemd_manager.stop_and_disable(execution.controller_service)
await execution.stop()

def forget_vm(self, vm_hash: ItemHash) -> None:
Expand Down
229 changes: 189 additions & 40 deletions src/aleph/vm/systemd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,224 @@
async SystemD Manager implementation.
"""

import enum
import logging
from typing import Literal, Optional, Protocol, runtime_checkable

import dbus
from dbus import DBusException, SystemBus
from dbus.proxies import Interface
from dbus_fast import BusType, DBusError
from dbus_fast.aio import MessageBus, ProxyObject

logger = logging.getLogger(__name__)


class UnitFileState(str, enum.Enum):
"""This StrEnum class represents the different possible states of a unit file."""

ENABLED = "enabled"
"""Indicates that a unit file is permanently enabled."""

ENABLED_RUNTIME = "enabled-runtime"
"""Indicates the unit file is only temporarily enabled and will no longer be enabled after a reboot
(that means, it is enabled via /run/ symlinks, rather than /etc/)."""

LINKED = "linked"
"""Indicates that a unit is linked into /etc/ permanently."""

LINKED_RUNTIME = "linked-runtime"
"""Indicates that a unit is linked into /run/ temporarily (until the next reboot)."""

MASKED = "masked"
"""Indicates that the unit file is masked permanently."""

MASKED_RUNTIME = "masked-runtime"
"""Indicates that it is masked in /run/ temporarily (until the next reboot)."""

STATIC = "static"
"""Indicates that the unit is statically enabled, i.e. always enabled and doesn't need to be enabled explicitly."""

DISABLED = "disabled"
"""Indicates that the unit file is not enabled."""

INVALID = "invalid"
"""Indicates that it could not be determined whether the unit file is enabled."""


UnitFileStateLiteral = Literal[
"enabled",
"enabled-runtime",
"linked",
"linked-runtime",
"masked",
"masked-runtime",
"static",
"disabled",
"invalid",
]


class Mode(str, enum.Enum):
REPLACE = "replace"
FAIL = "fail"
ISOLATE = "isolate"
IGNORE_DEPENDENCIES = "ignore-dependencies"
IGNORE_REQUIREMENTS = "ignore-requirements"


class ActiveState(str, enum.Enum):
"""
ActiveState contains a state value that reflects the unit's current status.
"""

ACTIVE = "active"
"""
The unit is active.
"""

RELOADING = "reloading"
"""
The unit is active and reloading its configuration.
"""

INACTIVE = "inactive"
"""
The unit is inactive, previous run was successful or hasn't yet occurred.
"""

FAILED = "failed"
"""
The unit is inactive, previous run was unsuccessful.
"""

ACTIVATING = "activating"
"""
The unit is transitioning from inactive to active state.
"""

DEACTIVATING = "deactivating"
"""
The unit is in the process of deactivation.
"""


ActiveStateLiteral = Literal["active", "reloading", "inactive", "failed", "activating", "deactivating"]


@runtime_checkable
class SystemdProxy(Protocol):
"""ABC for typing.

for description of methodsp
see https://www.freedesktop.org/software/systemd/man/latest/org.freedesktop.systemd1.html#The%20Manager%20Object"""

async def call_enable_unit_files(self, files: list[str], runtime: bool, force: bool): ...

async def call_get_unit_file_state(self, service) -> UnitFileStateLiteral: ...

async def call_start_unit(self, name, mode):
pass

async def call_stop_unit(self, name, mode): ...

async def call_restart_unit(self, name, mode): ...

async def call_disable_unit_files(self, files: list[str], runtime: bool): ...

async def call_get_unit(self, name: str) -> str: ...


@runtime_checkable
class UnitProxy(Protocol):
"""for typing.

for description of methods see
https://www.freedesktop.org/software/systemd/man/latest/org.freedesktop.systemd1.html#Service%20Unit%20Objects"""

async def get_active_state(self) -> ActiveStateLiteral: ...


class SystemDManager:
"""SystemD Manager class.

Used to manage the systemd services on the host on Linux.
"""

bus: SystemBus
manager: Interface
bus: Optional[MessageBus]
manager: Optional[SystemdProxy]

def __init__(self):
self.bus = dbus.SystemBus()
systemd = self.bus.get_object("org.freedesktop.systemd1", "/org/freedesktop/systemd1")
self.manager = dbus.Interface(systemd, "org.freedesktop.systemd1.Manager")

def stop_and_disable(self, service: str) -> None:
if self.is_service_active(service):
self.stop(service)
if self.is_service_enabled(service):
self.disable(service)

def enable(self, service: str) -> None:
self.manager.EnableUnitFiles([service], False, True)
pass

async def connect(self):
self.bus = MessageBus(bus_type=BusType.SYSTEM)
await self.bus.connect()
path = "/org/freedesktop/systemd1"
bus_name = "org.freedesktop.systemd1"
introspect = await self.bus.introspect(bus_name, path)
systemd_proxy: ProxyObject = self.bus.get_proxy_object(bus_name, path, introspection=introspect)
interface = systemd_proxy.get_interface("org.freedesktop.systemd1.Manager")
# Check required method are implemented
assert isinstance(interface, SystemdProxy)
self.manager = interface

async def enable(self, service: str) -> None:
assert self.manager, "connect() not called"
await self.manager.call_enable_unit_files([service], False, True)
logger.debug(f"Enabled {service} service")

def start(self, service: str) -> None:
self.manager.StartUnit(service, "replace")
async def start(self, service: str) -> None:
assert self.manager, "connect() not called"
await self.manager.call_start_unit(service, Mode.REPLACE)
logger.debug(f"Started {service} service")

def stop(self, service: str) -> None:
self.manager.StopUnit(service, "replace")
async def stop(self, service: str) -> None:
assert self.manager, "connect() not called"
await self.manager.call_stop_unit(service, Mode.REPLACE)
logger.debug(f"Stopped {service} service")

def restart(self, service: str) -> None:
self.manager.RestartUnit(service, "replace")
async def restart(self, service: str) -> None:
assert self.manager, "connect() not called"
await self.manager.call_restart_unit(service, Mode.REPLACE)
logger.debug(f"Restarted {service} service")

def disable(self, service: str) -> None:
self.manager.DisableUnitFiles([service], False)
async def disable(self, service: str) -> None:
assert self.manager, "connect() not called"
await self.manager.call_disable_unit_files([service], False)
logger.debug(f"Disabled {service} service")

def is_service_enabled(self, service: str) -> bool:
async def is_service_enabled(self, service: str) -> bool:
assert self.manager, "connect() not called"
try:
return self.manager.GetUnitFileState(service) == "enabled"
except DBusException as error:
state = await self.manager.call_get_unit_file_state(service)
return state == UnitFileState.ENABLED
except DBusError as error:
logger.error(error)
return False

def is_service_active(self, service: str) -> bool:
async def is_service_active(self, service: str) -> bool:
assert self.manager, "connect() not called"
assert self.bus, "connect() not called"
try:
systemd_service = self.bus.get_object("org.freedesktop.systemd1", object_path=self.manager.GetUnit(service))
unit = dbus.Interface(systemd_service, "org.freedesktop.systemd1.Unit")
unit_properties = dbus.Interface(unit, "org.freedesktop.DBus.Properties")
active_state = unit_properties.Get("org.freedesktop.systemd1.Unit", "ActiveState")
return active_state == "active"
except DBusException as error:
path = await self.manager.call_get_unit(service)
bus_name = "org.freedesktop.systemd1"
introspect = await self.bus.introspect(bus_name, path)
systemd_service = self.bus.get_proxy_object(bus_name, path, introspection=introspect)
unit = systemd_service.get_interface("org.freedesktop.systemd1.Unit")
# Check required method are implemented
assert isinstance(unit, UnitProxy)
active_state = await unit.get_active_state()
return active_state == ActiveState.ACTIVE
except DBusError as error:
logger.error(error)
return False

def enable_and_start(self, service: str) -> None:
if not self.is_service_enabled(service):
self.enable(service)
if not self.is_service_active(service):
self.start(service)
async def enable_and_start(self, service: str) -> None:
if not await self.is_service_enabled(service):
await self.enable(service)
if not await self.is_service_active(service):
await self.start(service)

async def stop_and_disable(self, service: str) -> None:
if await self.is_service_active(service):
await self.stop(service)
if await self.is_service_enabled(service):
await self.disable(service)