diff --git a/tests/system/framework/__init__.py b/tests/system/framework/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/system/framework/config.py b/tests/system/framework/config.py new file mode 100644 index 000000000..69ab951e0 --- /dev/null +++ b/tests/system/framework/config.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from typing import Type + +from pytest_mh import MultihostConfig, MultihostDomain, MultihostHost, MultihostRole + +__all__ = [ + "ShadowMultihostConfig", + "ShadowMultihostDomain", +] + + +class ShadowMultihostConfig(MultihostConfig): + @property + def id_to_domain_class(self) -> dict[str, Type[MultihostDomain]]: + """ + All domains are mapped to :class:`ShadowMultihostDomain`. + + :rtype: Class name. + """ + return {"*": ShadowMultihostDomain} + + +class ShadowMultihostDomain(MultihostDomain[ShadowMultihostConfig]): + @property + def role_to_host_class(self) -> dict[str, Type[MultihostHost]]: + """ + Map roles to classes: + + * shadow to ShadowHost + + :rtype: Class name. + """ + from .hosts.shadow import ShadowHost + + return { + "shadow": ShadowHost, + } + + @property + def role_to_role_class(self) -> dict[str, Type[MultihostRole]]: + """ + Map roles to classes: + + * shadow to Shadow + + :rtype: Class name. + """ + from .roles.shadow import Shadow + + return { + "shadow": Shadow, + } diff --git a/tests/system/framework/fixtures.py b/tests/system/framework/fixtures.py new file mode 100644 index 000000000..d9775db0d --- /dev/null +++ b/tests/system/framework/fixtures.py @@ -0,0 +1,45 @@ +"""Pytest fixtures.""" + +from __future__ import annotations + +import os + +import pytest + + +@pytest.fixture(scope="session") +def datadir(request: pytest.FixtureRequest) -> str: + """ + Data directory shared for all tests. + + :return: Path to the data directory ``(root-pytest-dir)/data``. + :rtype: str + """ + return os.path.join(request.node.path, "data") + + +@pytest.fixture(scope="module") +def moduledatadir(datadir: str, request: pytest.FixtureRequest) -> str: + """ + Data directory shared for all tests within a single module. + + :return: Path to the data directory ``(root-pytest-dir)/data/$module_name``. + :rtype: str + """ + name = request.module.__name__ + return os.path.join(datadir, name) + + +@pytest.fixture(scope="function") +def testdatadir(moduledatadir: str, request: pytest.FixtureRequest) -> str: + """ + Data directory for current test. + + :return: Path to the data directory ``(root-pytest-dir)/data/$module_name/$test_name``. + :rtype: str + """ + if not isinstance(request.node, pytest.Function): + raise TypeError(f"Excepted pytest.Function, got {type(request.node)}") + + name = request.node.originalname + return os.path.join(moduledatadir, name) diff --git a/tests/system/framework/hosts/__init__.py b/tests/system/framework/hosts/__init__.py new file mode 100644 index 000000000..feb44d4e2 --- /dev/null +++ b/tests/system/framework/hosts/__init__.py @@ -0,0 +1,3 @@ +"""shadow multihost hosts.""" + +from __future__ import annotations diff --git a/tests/system/framework/hosts/base.py b/tests/system/framework/hosts/base.py new file mode 100644 index 000000000..a5ee5d914 --- /dev/null +++ b/tests/system/framework/hosts/base.py @@ -0,0 +1,107 @@ +"""Base classes and objects for shadow specific multihost hosts.""" + +from __future__ import annotations + +import csv + +from pytest_mh import MultihostBackupHost, MultihostHost +from pytest_mh.utils.fs import LinuxFileSystem + +from ..config import ShadowMultihostDomain + +__all__ = [ + "BaseHost", + "BaseLinuxHost", +] + + +class BaseHost(MultihostBackupHost[ShadowMultihostDomain]): + """ + Base class for all shadow hosts. + """ + + def __init__(self, *args, **kwargs) -> None: + # restore is handled in topology controllers + super().__init__(*args, **kwargs) + + @property + def features(self) -> dict[str, bool]: + """ + Features supported by the host. + """ + return {} + + +class BaseLinuxHost(MultihostHost[ShadowMultihostDomain]): + """ + Base Linux host. + + Adds linux specific reentrant utilities. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self.fs: LinuxFileSystem = LinuxFileSystem(self) + self._os_release: dict = {} + self._distro_name: str = "unknown" + self._distro_major: int = 0 + self._distro_minor: int = 0 + + def _distro_information(self): + """ + Pulls distro information from a host from /ets/os-release + """ + self.logger.info(f"Detecting distro information on {self.hostname}") + os_release = self.fs.read("/etc/os-release") + self._os_release = dict(csv.reader([x for x in os_release.splitlines() if x], delimiter="=")) + if "NAME" in self._os_release: + self._distro_name = self._os_release["NAME"] + if "VERSION_ID" not in self._os_release: + return + if "." in self._os_release["VERSION_ID"]: + self._distro_major = int(self._os_release["VERSION_ID"].split(".", maxsplit=1)[0]) + self._distro_minor = int(self._os_release["VERSION_ID"].split(".", maxsplit=1)[1]) + else: + self._distro_major = int(self._os_release["VERSION_ID"]) + + @property + def distro_name(self) -> str: + """ + Host distribution + + :return: Distribution name or "unknown" + :rtype: str + """ + # NAME item from os-release + if not self._os_release: + self._distro_information() + return self._distro_name + + @property + def distro_major(self) -> int: + """ + Host distribution major version + + :return: Major version + :rtype: int + """ + # First part of VERSION_ID from os-release + # Returns zero when could not detect + if not self._os_release: + self._distro_information() + return self._distro_major + + @property + def distro_minor(self) -> int: + """ + Host distribution minor version + + :return: Minor version + :rtype: int + """ + # Second part of VERSION_ID from os-release + # Returns zero when no minor version is present + if not self._os_release: + self._distro_information() + return self._distro_minor diff --git a/tests/system/framework/hosts/shadow.py b/tests/system/framework/hosts/shadow.py new file mode 100644 index 000000000..1fb656b63 --- /dev/null +++ b/tests/system/framework/hosts/shadow.py @@ -0,0 +1,175 @@ +"""shadow multihost host.""" + +from __future__ import annotations + +from pathlib import PurePosixPath +from typing import Any + +from pytest_mh.conn import ProcessLogLevel + +from .base import BaseHost, BaseLinuxHost + +__all__ = [ + "ShadowHost", +] + + +class ShadowHost(BaseHost, BaseLinuxHost): + """ + shadow host object. + + This is the host where the tests are run. + + .. note:: + + Full backup and restore of shadow state is supported. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self._features: dict[str, bool] | None = None + """Features dictionary.""" + + self._backup_path: PurePosixPath | None = None + """Path to backup files.""" + + self._verify_files: [dict[str, str]] = [ + {"origin": "/etc/passwd", "backup": "passwd"}, + {"origin": "/etc/shadow", "backup": "shadow"}, + {"origin": "/etc/group", "backup": "group"}, + {"origin": "/etc/gshadow", "backup": "gshadow"}, + ] + """Files to verify for mismatch.""" + + def pytest_setup(self) -> None: + super().pytest_setup() + + def start(self) -> None: + """ + Not supported. + + :raises NotImplementedError: _description_ + """ + raise NotImplementedError("Starting shadow service is not implemented.") + + def stop(self) -> None: + """ + Not supported. + + :raises NotImplementedError: _description_ + """ + raise NotImplementedError("Stopping shadow service is not implemented.") + + def backup(self) -> Any: + """ + Backup all shadow data. + + :return: Backup data. + :rtype: Any + """ + self.logger.info("Creating backup of shadow host") + + result = self.conn.run( + """ + set -ex + + function backup { + if [ -d "$1" ] || [ -f "$1" ]; then + cp --force --archive "$1" "$2" + fi + } + + path=`mktemp -d` + backup /etc/login.defs "$path/login.defs" + backup /etc/default/useradd "$path/useradd" + backup /etc/passwd "$path/passwd" + backup /etc/shadow "$path/shadow" + backup /etc/group "$path/group" + backup /etc/gshadow "$path/gshadow" + backup /etc/subuid "$path/subuid" + backup /etc/subgid "$path/subgid" + backup /home "$path/home" + backup /var/log/secure "$path/secure" + + echo $path + """, + log_level=ProcessLogLevel.Error, + ) + + self._backup_path = PurePosixPath(result.stdout_lines[-1].strip()) + + return PurePosixPath(result.stdout_lines[-1].strip()) + + def restore(self, backup_data: Any | None) -> None: + """ + Restore all shadow data. + + :return: Backup data. + :rtype: Any + """ + if backup_data is None: + return + + if not isinstance(backup_data, PurePosixPath): + raise TypeError(f"Expected PurePosixPath, got {type(backup_data)}") + + backup_path = str(backup_data) + + self.logger.info(f"Restoring shadow data from {backup_path}") + self.conn.run( + f""" + set -ex + + function restore {{ + rm --force --recursive "$2" + if [ -d "$1" ] || [ -f "$1" ]; then + cp --force --archive "$1" "$2" + fi + }} + + rm --force --recursive /var/log/secure + restore "{backup_path}/login.defs" /etc/login.defs + restore "{backup_path}/useradd" /etc/default/useradd + restore "{backup_path}/passwd" /etc/passwd + restore "{backup_path}/shadow" /etc/shadow + restore "{backup_path}/group" /etc/group + restore "{backup_path}/gshadow" /etc/gshadow + restore "{backup_path}/subuid" /etc/subuid + restore "{backup_path}/subgid" /etc/subgid + restore "{backup_path}/home" /home + restore "{backup_path}/secure" /var/log/secure + """, + log_level=ProcessLogLevel.Error, + ) + + def detect_file_mismatches(self) -> None: + """ + Shadow binaries modify a number of files, but usually do not modify all of them. This is why we add an + additional check at the end of the test to verify that the files that should not have been modified are still + intact. + """ + self.logger.info(f"Detecting mismatches in shadow files {self._backup_path}") + + for x in self._verify_files: + result = self.conn.run( + f""" + set -ex + + cmp {x['origin']} {self._backup_path}/{x['backup']} + """, + log_level=ProcessLogLevel.Error, + raise_on_error=False, + ) + if result.rc != 0: + self.logger.error(f"File mismatch in '{x['origin']}' and '{self._backup_path}/{x['backup']}'") + result.throw() + + def discard_file(self, origin: str) -> None: + """ + Discard modified files from the files that should be verified. + """ + for x in self._verify_files: + if x["origin"] == origin: + self._verify_files.remove(x) + break diff --git a/tests/system/framework/markers.py b/tests/system/framework/markers.py new file mode 100644 index 000000000..89caa05d5 --- /dev/null +++ b/tests/system/framework/markers.py @@ -0,0 +1,100 @@ +"""Pytest fixtures.""" + +from __future__ import annotations + +from functools import partial + +import pytest +from pytest_mh import MultihostItemData, Topology + +from .misc import to_list_of_strings +from .roles.base import BaseRole +from .topology import KnownTopology, KnownTopologyGroup + + +def pytest_configure(config: pytest.Config): + """ + Pytest hook: register multihost plugin. + """ + + # register additional markers + config.addinivalue_line( + "markers", + "builtwith(feature): Run test only if shadow was built with given feature", + ) + + +def builtwith(item: pytest.Function, requirements: dict[str, str], **kwargs: BaseRole): + def value_error(msg: str) -> ValueError: + return ValueError(f"{item.nodeid}::{item.originalname}: @pytest.mark.builtwith: {msg}") + + errors: list[str] = [] + for role, features in requirements.items(): + if role not in kwargs: + raise value_error(f"unknown fixture '{role}'") + + if not isinstance(kwargs[role], BaseRole): + raise value_error(f"fixture '{role}' is not instance of BaseRole") + + obj = kwargs[role] + for feature in to_list_of_strings(features): + if feature not in obj.features: + raise value_error(f"unknown feature '{feature}' in '{role}'") + + if not obj.features[feature]: + errors.append(f'{role} does not support "{feature}"') + + if len(errors) == 1: + return (False, errors[0]) + elif len(errors) > 1: + return (False, str(errors)) + + # All requirements were passed + return True + + +@pytest.hookimpl(tryfirst=True) +def pytest_runtest_setup(item: pytest.Item) -> None: + if not isinstance(item, pytest.Function): + raise TypeError(f"Unexpected item type: {type(item)}") + + topology: list[Topology] = [] + mh_item_data: MultihostItemData | None = MultihostItemData.GetData(item) + for mark in item.iter_markers("builtwith"): + requirements: dict[str, str] = {} + + if len(mark.args) == 1 and not mark.kwargs: + # @pytest.mark.builtwith("feature_x") + # -> check if "feature_x" is supported by shadow + requirements["shadow"] = mark.args[0] + topology = [] + elif not mark.args and mark.kwargs: + # @pytest.mark.builtwith(shadow="feature_x", another_host="feature_x") -> + # -> check if "feature_x" is supported by both shadow and another_host + requirements = dict(mark.kwargs) + topology = [] + elif ( + len(mark.args) == 1 + and isinstance(mark.args[0], (Topology, KnownTopology, KnownTopologyGroup)) + and mark.kwargs + ): + # @pytest.mark.builtwith(KnownTopology.Shadow, shadow="feature_x") -> + # -> check if "feature_x" is supported by shadow only if the test runs on shadow topology + requirements = dict(mark.kwargs) + if isinstance(mark.args[0], Topology): + topology = [mark.args[0]] + elif isinstance(mark.args[0], KnownTopology): + topology = [mark.args[0].value.topology] + elif isinstance(mark.args[0], KnownTopologyGroup): + topology = [x.value.topology for x in mark.args[0].value] + else: + raise ValueError(f"{item.nodeid}::{item.originalname}: invalid arguments for @pytest.mark.builtwith") + + if mh_item_data is None: + raise ValueError(f"{item.nodeid}::{item.originalname}: multihost item data is not set") + + if mh_item_data.topology_mark is None: + raise ValueError(f"{item.nodeid}::{item.originalname}: multihost topology mark is not set") + + if not topology or mh_item_data.topology_mark.topology in topology: + item.add_marker(pytest.mark.require(partial(builtwith, item=item, requirements=requirements))) diff --git a/tests/system/framework/misc/__init__.py b/tests/system/framework/misc/__init__.py new file mode 100644 index 000000000..92e0b76b9 --- /dev/null +++ b/tests/system/framework/misc/__init__.py @@ -0,0 +1,42 @@ +"""Miscellaneous functions.""" + +from __future__ import annotations + +from typing import Any + + +def to_list(value: Any | list[Any] | None) -> list[Any]: + """ + Convert value into a list. + + - if value is ``None`` then return an empty list + - if value is already a list then return it unchanged + - if value is not a list then return ``[value]`` + + :param value: Value that should be converted to a list. + :type value: Any | list[Any] | None + :return: List with the value as an element. + :rtype: list[Any] + """ + if value is None: + return [] + + if isinstance(value, list): + return value + + return [value] + + +def to_list_of_strings(value: Any | list[Any] | None) -> list[str]: + """ + Convert given list or single value to list of strings. + + The ``value`` is first converted to a list and then ``str(item)`` is run on + each of its item. + + :param value: Value to convert. + :type value: Any | list[Any] | None + :return: List of strings. + :rtype: list[str] + """ + return [str(x) for x in to_list(value)] diff --git a/tests/system/framework/misc/errors.py b/tests/system/framework/misc/errors.py new file mode 100644 index 000000000..e4955af58 --- /dev/null +++ b/tests/system/framework/misc/errors.py @@ -0,0 +1,42 @@ +from __future__ import annotations + + +class ExpectScriptError(Exception): + """ + Expect script error. + + Seeing this exception means that there is an unhandled path or other error + in the expect script that was executed. The script needs to be fixed. + """ + + def __init__(self, code: int, msg: str | None = None) -> None: + """ + :param code: Expect script error code. + :type code: int + :param msg: Error message, defaults to None (translate error code to message) + :type msg: str | None, optional + """ + self.code: int = code + if msg is None: + msg = self.code_to_message(code) + + super().__init__(msg) + + def code_to_message(self, code: int) -> str: + """ + Translate expect script error codes used in this framework to message. + + :param code: Expect script error code. + :type code: int + :return: Error message. + :rtype: str + """ + match code: + case 201: + return "Timeout, unexpected output" + case 202: + return "Unexpected end of file" + case 203: + return "Unexpected code path" + + return "Unknown error code" diff --git a/tests/system/framework/roles/__init__.py b/tests/system/framework/roles/__init__.py new file mode 100644 index 000000000..9a45a54bc --- /dev/null +++ b/tests/system/framework/roles/__init__.py @@ -0,0 +1,3 @@ +"""shadow multihost roles.""" + +from __future__ import annotations diff --git a/tests/system/framework/roles/base.py b/tests/system/framework/roles/base.py new file mode 100644 index 000000000..616ba4eb3 --- /dev/null +++ b/tests/system/framework/roles/base.py @@ -0,0 +1,172 @@ +"""Base classes and objects for shadow specific multihost roles.""" + +from __future__ import annotations + +from typing import Any, Generic, TypeGuard, TypeVar + +from pytest_mh import MultihostRole +from pytest_mh.cli import CLIBuilder +from pytest_mh.conn import Bash, Shell +from pytest_mh.conn.ssh import SSHClient +from pytest_mh.utils.coredumpd import Coredumpd +from pytest_mh.utils.firewall import Firewalld +from pytest_mh.utils.fs import LinuxFileSystem +from pytest_mh.utils.journald import JournaldUtils +from pytest_mh.utils.tc import LinuxTrafficControl + +from ..hosts.base import BaseHost +from ..utils.tools import LinuxToolsUtils + +HostType = TypeVar("HostType", bound=BaseHost) +RoleType = TypeVar("RoleType", bound=MultihostRole) + + +__all__ = [ + "HostType", + "RoleType", + "DeleteAttribute", + "BaseObject", + "BaseRole", + "BaseLinuxRole", +] + + +class DeleteAttribute(object): + """ + This class is used to distinguish between setting an attribute to an empty + value and deleting it completely. + """ + + pass + + +class BaseObject(Generic[HostType, RoleType]): + """ + Base class for object management classes (like users or groups). + + It provides shortcuts to low level functionality to easily enable execution + of remote commands. It also defines multiple helper methods that are shared + across roles. + """ + + def __init__(self, role: RoleType) -> None: + self.role: RoleType = role + """Multihost role object.""" + + self.host: HostType = role.host + """Multihost host object.""" + + self.cli: CLIBuilder = self.host.cli + """Command line builder to easy build command line for execution.""" + + +class BaseRole(MultihostRole[HostType]): + """ + Base role class. Roles are the main interface to the remote hosts that can + be directly accessed in test cases as fixtures. + + All changes to the remote host that were done through the role object API + are automatically reverted when a test is finished. + """ + + Delete: DeleteAttribute = DeleteAttribute() + """ + Use this to indicate that you want to delete an attribute instead of setting + it to an empty value. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + def is_delete_attribute(self, value: Any) -> TypeGuard[DeleteAttribute]: + """ + Return ``True`` if the value is :attr:`DeleteAttribute` + + :param value: Value to test. + :type value: Any + :return: Return ``True`` if the value is :attr:`DeleteAttribute` + :rtype: TypeGuard[DeleteAttribute] + """ + return isinstance(value, DeleteAttribute) + + @property + def features(self) -> dict[str, bool]: + """ + Features supported by the role. + """ + return self.host.features + + def ssh(self, user: str, password: str, *, shell: Shell | None = None) -> SSHClient: + """ + Open SSH connection to the host as given user. + + :param user: Username. + :type user: str + :param password: User password. + :type password: str + :param shell: Shell that will run the commands, defaults to ``None`` (= ``Bash``) + :type shell: Shell | None, optional + :return: SSH client connection. + :rtype: SSHClient + """ + if shell is None: + shell = Bash() + + host = self.host.hostname + port = 22 + + if isinstance(self.host.conn, SSHClient): + host = getattr(self.host.conn, "host", host) + port = getattr(self.host.conn, "port", 22) + + return SSHClient( + host=host, + port=port, + user=user, + password=password, + shell=shell, + logger=self.logger, + ) + + +class BaseLinuxRole(BaseRole[HostType]): + """ + Base linux role. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self.fs: LinuxFileSystem = LinuxFileSystem(self.host) + """ + File system manipulation. + """ + + self.firewall: Firewalld = Firewalld(self.host).postpone_setup() + """ + Configure firewall using firewalld. + """ + + self.tc: LinuxTrafficControl = LinuxTrafficControl(self.host).postpone_setup() + """ + Traffic control manipulation. + """ + + self.tools: LinuxToolsUtils = LinuxToolsUtils(self.host) + """ + Standard tools interface. + """ + + self.journald: JournaldUtils = JournaldUtils(self.host) + """ + Journald utilities. + """ + + coredumpd_config = self.host.config.get("coredumpd", {}) + coredumpd_mode = coredumpd_config.get("mode", "ignore") + coredumpd_filter = coredumpd_config.get("filter", None) + + self.coredumpd: Coredumpd = Coredumpd(self.host, self.fs, mode=coredumpd_mode, filter=coredumpd_filter) + """ + Coredumpd utilities. + """ diff --git a/tests/system/framework/roles/shadow.py b/tests/system/framework/roles/shadow.py new file mode 100644 index 000000000..51d7493ad --- /dev/null +++ b/tests/system/framework/roles/shadow.py @@ -0,0 +1,129 @@ +"""shadow multihost role.""" + +from __future__ import annotations + +import shlex +from typing import Dict + +from pytest_mh.conn import ProcessLogLevel, ProcessResult + +from ..hosts.shadow import ShadowHost +from .base import BaseLinuxRole + +__all__ = [ + "Shadow", +] + + +class Shadow(BaseLinuxRole[ShadowHost]): + """ + shadow role. + + Provides unified Python API for managing and testing shadow. + """ + + def __init__(self, *args, **kwargs) -> None: + """ + Set up the environment. + """ + super().__init__(*args, **kwargs) + + def teardown(self) -> None: + """ + Detect file mismatches before cleaning up the environment. + """ + self.host.detect_file_mismatches() + """ + Clean up the environment. + """ + super().teardown() + + def _parse_args(self, *args) -> Dict[str, str]: + args_list = shlex.split(*args[0]) + name = args_list[-1] + + return {"name": name} + + def useradd(self, *args) -> ProcessResult: + """ + Create user. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Creating user "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("useradd " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/passwd") + self.host.discard_file("/etc/shadow") + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd + + def usermod(self, *args) -> ProcessResult: + """ + Modify user. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Modifying user "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("usermod " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/passwd") + self.host.discard_file("/etc/shadow") + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd + + def userdel(self, *args) -> ProcessResult: + """ + Delete user. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Deleting user "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("userdel " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/passwd") + self.host.discard_file("/etc/shadow") + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd + + def groupadd(self, *args) -> ProcessResult: + """ + Create group. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Creating group "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("groupadd " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd + + def groupmod(self, *args) -> ProcessResult: + """ + Modify group. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Modifying group "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("groupmod " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd + + def groupdel(self, *args) -> ProcessResult: + """ + Delete group. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Deleting group "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("groupdel " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd diff --git a/tests/system/framework/topology.py b/tests/system/framework/topology.py new file mode 100644 index 000000000..88f01d013 --- /dev/null +++ b/tests/system/framework/topology.py @@ -0,0 +1,55 @@ +"""Predefined well-known topologies.""" + +from __future__ import annotations + +from enum import unique +from typing import final + +from pytest_mh import KnownTopologyBase, KnownTopologyGroupBase, Topology, TopologyDomain, TopologyMark + +__all__ = [ + "KnownTopology", + "KnownTopologyGroup", +] + + +@final +@unique +class KnownTopology(KnownTopologyBase): + """ + Well-known topologies that can be given to ``pytest.mark.topology`` + directly. It is expected to use these values in favor of providing + custom marker values. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.Shadow) + def test_ldap(shadow: Shadow): + assert True + """ + + Shadow = TopologyMark( + name="shadow", + topology=Topology(TopologyDomain("shadow", shadow=1)), + fixtures=dict(shadow="shadow.shadow[0]"), + ) + + +class KnownTopologyGroup(KnownTopologyGroupBase): + """ + Groups of well-known topologies that can be given to ``pytest.mark.topology`` + directly. It is expected to use these values in favor of providing + custom marker values. + + The test is parametrized and runs multiple times, once per each topology. + + .. code-block:: python + :caption: Example usage (runs on Shadow topology) + + @pytest.mark.topology(KnownTopologyGroup.AnyProvider) + def test_ldap(shadow: Shadow): + assert True + """ + + AnyProvider = [KnownTopology.Shadow] diff --git a/tests/system/framework/utils/__init__.py b/tests/system/framework/utils/__init__.py new file mode 100644 index 000000000..1ab732490 --- /dev/null +++ b/tests/system/framework/utils/__init__.py @@ -0,0 +1,3 @@ +"""shadow multihost utils used by roles.""" + +from __future__ import annotations diff --git a/tests/system/framework/utils/tools.py b/tests/system/framework/utils/tools.py new file mode 100644 index 000000000..64a298c20 --- /dev/null +++ b/tests/system/framework/utils/tools.py @@ -0,0 +1,475 @@ +"""Run various standard Linux commands on remote host.""" + +from __future__ import annotations + +from typing import Any + +import jc +from pytest_mh import MultihostHost, MultihostUtility +from pytest_mh.conn import Process + +__all__ = [ + "UnixObject", + "UnixUser", + "UnixGroup", + "IdEntry", + "PasswdEntry", + "GroupEntry", + "InitgroupsEntry", + "LinuxToolsUtils", + "KillCommand", + "GetentUtils", +] + + +class UnixObject(object): + """ + Generic Unix object. + """ + + def __init__(self, id: int | None, name: str | None) -> None: + """ + :param id: Object ID. + :type id: int | None + :param name: Object name. + :type name: str | None + """ + self.id: int | None = id + """ + ID. + """ + + self.name: str | None = name + """ + Name. + """ + + def __str__(self) -> str: + return f'({self.id},"{self.name}")' + + def __repr__(self) -> str: + return str(self) + + def __eq__(self, o: object) -> bool: + if isinstance(o, str): + return o == self.name + elif isinstance(o, int): + return o == self.id + elif isinstance(o, tuple): + if len(o) != 2 or not isinstance(o[0], int) or not isinstance(o[1], str): + raise NotImplementedError(f"Unable to compare {type(o)} with {self.__class__}") + + (id, name) = o + return id == self.id and name == self.name + elif isinstance(o, UnixObject): + # Fallback to identity comparison + return NotImplemented + + raise NotImplementedError(f"Unable to compare {type(o)} with {self.__class__}") + + +class UnixUser(UnixObject): + """ + Unix user. + """ + + pass + + +class UnixGroup(UnixObject): + """ + Unix group. + """ + + pass + + +class IdEntry(object): + """ + Result of ``id`` + """ + + def __init__(self, user: UnixUser, group: UnixGroup, groups: list[UnixGroup]) -> None: + self.user: UnixUser = user + """ + User information. + """ + + self.group: UnixGroup = group + """ + Primary group. + """ + + self.groups: list[UnixGroup] = groups + """ + Secondary groups. + """ + + def memberof(self, groups: int | str | tuple[int, str] | list[int | str | tuple[int, str]]) -> bool: + """ + Check if the user is member of give group(s). + + Group specification can be either a single gid or group name. But it can + be also a tuple of (gid, name) where both gid and name must match or list + of groups where the user must be member of all given groups. + + :param groups: _description_ + :type groups: int | str | tuple + :return: _description_ + :rtype: bool + """ + if isinstance(groups, (int, str, tuple)): + return groups in self.groups + + return all(x in self.groups for x in groups) + + def __str__(self) -> str: + return f"{{user={str(self.user)},group={str(self.group)},groups={str(self.groups)}}}" + + def __repr__(self) -> str: + return str(self) + + @classmethod + def FromDict(cls, d: dict[str, Any]) -> IdEntry: + user = UnixUser(d["uid"]["id"], d["uid"].get("name", None)) + group = UnixGroup(d["gid"]["id"], d["gid"].get("name", None)) + groups = [] + + for secondary_group in d["groups"]: + groups.append(UnixGroup(secondary_group["id"], secondary_group.get("name", None))) + + return cls(user, group, groups) + + @classmethod + def FromOutput(cls, stdout: str) -> IdEntry: + jcresult = jc.parse("id", stdout) + + if not isinstance(jcresult, dict): + raise TypeError(f"Unexpected type: {type(jcresult)}, expecting dict") + + return cls.FromDict(jcresult) + + +class PasswdEntry(object): + """ + Result of ``getent passwd`` + """ + + def __init__(self, name: str, password: str, uid: int, gid: int, gecos: str, home: str, shell: str) -> None: + self.name: str | None = name + """ + User name. + """ + + self.password: str | None = password + """ + User password. + """ + + self.uid: int = uid + """ + User id. + """ + + self.gid: int = gid + """ + Group id. + """ + + self.gecos: str | None = gecos + """ + GECOS. + """ + + self.home: str | None = home + """ + Home directory. + """ + + self.shell: str | None = shell + """ + Login shell. + """ + + def __str__(self) -> str: + return f"({self.name}:{self.password}:{self.uid}:{self.gid}:{self.gecos}:{self.home}:{self.shell})" + + def __repr__(self) -> str: + return str(self) + + @classmethod + def FromDict(cls, d: dict[str, Any]) -> PasswdEntry: + return cls( + name=d.get("username", None), + password=d.get("password", None), + uid=d.get("uid", None), + gid=d.get("gid", None), + gecos=d.get("comment", None), + home=d.get("home", None), + shell=d.get("shell", None), + ) + + @classmethod + def FromOutput(cls, stdout: str) -> PasswdEntry: + result = jc.parse("passwd", stdout) + + if not isinstance(result, list): + raise TypeError(f"Unexpected type: {type(result)}, expecting list") + + if len(result) != 1: + raise ValueError("More then one entry was returned") + + return cls.FromDict(result[0]) + + +class GroupEntry(object): + """ + Result of ``getent group`` + """ + + def __init__(self, name: str, password: str, gid: int, members: list[str]) -> None: + self.name: str | None = name + """ + Group name. + """ + + self.password: str | None = password + """ + Group password. + """ + + self.gid: int = gid + """ + Group id. + """ + + self.members: list[str] = members + """ + Group members. + """ + + def __str__(self) -> str: + return f'({self.name}:{self.password}:{self.gid}:{",".join(self.members)})' + + def __repr__(self) -> str: + return str(self) + + @classmethod + def FromDict(cls, d: dict[str, Any]) -> GroupEntry: + return cls( + name=d.get("group_name", None), + password=d.get("password", None), + gid=d.get("gid", None), + members=d.get("members", []), + ) + + @classmethod + def FromOutput(cls, stdout: str) -> GroupEntry: + result = jc.parse("group", stdout) + + if not isinstance(result, list): + raise TypeError(f"Unexpected type: {type(result)}, expecting list") + + if len(result) != 1: + raise ValueError("More then one entry was returned") + + return cls.FromDict(result[0]) + + +class InitgroupsEntry(object): + """ + Result of ``getent initgroups`` + + If user does not exist or does not have any supplementary groups then ``self.groups`` is empty. + """ + + def __init__(self, name: str, groups: list[int]) -> None: + self.name: str = name + """ + Exact username for which ``initgroups`` was called + """ + + self.groups: list[int] = groups + """ + Group ids that ``name`` is member of. + """ + + def __str__(self) -> str: + return f'({self.name}:{",".join([str(i) for i in self.groups])})' + + def __repr__(self) -> str: + return str(self) + + def memberof(self, groups: list[int]) -> bool: + """ + Check if the user is member of given groups. + + This method checks only supplementary groups not the primary group. + + :param groups: List of group ids + :type groups: list[int] + :return: If user is member of all given groups True, otherwise False. + :rtype: bool + """ + + return all(x in self.groups for x in groups) + + @classmethod + def FromDict(cls, d: dict[str, Any]) -> InitgroupsEntry: + return cls( + name=d["name"], + groups=d.get("groups", []), + ) + + @classmethod + def FromOutput(cls, stdout: str) -> InitgroupsEntry: + result: list[str] = stdout.split() + + dictionary: dict[str, str | list[int]] = {} + dictionary["name"] = result[0] + + if len(result) > 1: + dictionary["groups"] = [int(x) for x in result[1:]] + + return cls.FromDict(dictionary) + + +class LinuxToolsUtils(MultihostUtility[MultihostHost]): + """ + Run various standard commands on remote host. + """ + + def __init__(self, host: MultihostHost) -> None: + """ + :param host: Remote host. + :type host: MultihostHost + """ + super().__init__(host) + + self.getent: GetentUtils = GetentUtils(host) + """ + Run ``getent`` command. + """ + + def id(self, name: str | int) -> IdEntry | None: + """ + Run ``id`` command. + + :param name: User name or id. + :type name: str | int + :return: id data, None if not found + :rtype: IdEntry | None + """ + command = self.host.conn.exec(["id", name], raise_on_error=False) + if command.rc != 0: + return None + + return IdEntry.FromOutput(command.stdout) + + def grep(self, pattern: str, paths: str | list[str], args: list[str] | None = None) -> bool: + """ + Run ``grep`` command. + + :param pattern: Pattern to match. + :type pattern: str + :param paths: Paths to search. + :type paths: str | list[str] + :param args: Additional arguments to ``grep`` command, defaults to None. + :type args: list[str] | None, optional + :return: True if grep returned 0, False otherwise. + :rtype: bool + """ + if args is None: + args = [] + + paths = [paths] if isinstance(paths, str) else paths + command = self.host.conn.exec(["grep", *args, pattern, *paths]) + + return command.rc == 0 + + +class KillCommand(object): + def __init__(self, host: MultihostHost, process: Process, pid: int) -> None: + self.host = host + self.process = process + self.pid = pid + self.__killed: bool = False + + def kill(self) -> None: + if self.__killed: + return + + self.host.conn.exec(["kill", self.pid]) + self.__killed = True + + def __enter__(self) -> KillCommand: + return self + + def __exit__(self, exception_type, exception_value, traceback) -> None: + self.kill() + self.process.wait() + + +class GetentUtils(MultihostUtility[MultihostHost]): + """ + Interface to getent command. + """ + + def __init__(self, host: MultihostHost) -> None: + """ + :param host: Remote host. + :type host: MultihostHost + """ + super().__init__(host) + + def passwd(self, name: str | int, *, service: str | None = None) -> PasswdEntry | None: + """ + Call ``getent passwd $name`` + + :param name: User name or id. + :type name: str | int + :param service: Service used, defaults to None + :type service: str | None + :return: passwd data, None if not found + :rtype: PasswdEntry | None + """ + return self.__exec(PasswdEntry, "passwd", name, service) + + def group(self, name: str | int, *, service: str | None = None) -> GroupEntry | None: + """ + Call ``getent group $name`` + + :param name: Group name or id. + :type name: str | int + :param service: Service used, defaults to None + :type service: str | None + :return: group data, None if not found + :rtype: PasswdEntry | None + """ + return self.__exec(GroupEntry, "group", name, service) + + def initgroups(self, name: str, *, service: str | None = None) -> InitgroupsEntry: + """ + Call ``getent initgroups $name`` + + If ``name`` does not exist, group list is empty. This is standard behavior of ``getent initgroups`` + + :param name: User name. + :type name: str + :param service: Service used, defaults to None + :type service: str | None + :return: Initgroups data + :rtype: InitgroupsEntry + """ + return self.__exec(InitgroupsEntry, "initgroups", name, service) + + def __exec(self, cls, cmd: str, name: str | int, service: str | None = None) -> Any: + args = [] + if service is not None: + args = ["-s", service] + + command = self.host.conn.exec(["getent", *args, cmd, name], raise_on_error=False) + if command.rc != 0: + return None + + return cls.FromOutput(command.stdout)