-
Couldn't load subscription status.
- Fork 293
CA-390883: Add unit test for usb_reset.py (code fix done by CA-388318) #5401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| getgid | ||
| getuid | ||
| NEWNS | ||
| pytest | ||
| pytype | ||
| setgroups | ||
| strerror | ||
| tmpfs |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| rule_settings: | ||
| disable: | ||
| - use-fstring-for-concatenation | ||
| - use-fstring-for-formatting |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| """scripts/unit_test/conftest.py: Common pytest module for shared pytest fixtures""" | ||
| import pytest | ||
|
|
||
| from .rootless_container import enter_private_mount_namespace | ||
|
|
||
|
|
||
| @pytest.fixture(scope="session") | ||
| def private_mount_namespace(): | ||
| """Enter a private mount namespace that allows us to test mount and unmount""" | ||
| return enter_private_mount_namespace() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| """helpers for unit-testing functions in scripts without permanent global mocks""" | ||
| import os | ||
| import sys | ||
| from contextlib import contextmanager | ||
| from types import ModuleType | ||
|
|
||
| from mock import Mock | ||
|
|
||
| if sys.version_info >= (3,): | ||
| from typing import Generator | ||
|
|
||
|
|
||
| @contextmanager | ||
| def mocked_modules(*module_names): # type:(str) -> Generator[None, None, None] | ||
| """Context manager that temporarily mocks the specified modules. | ||
| :param module_names: Variable number of names of the modules to be mocked. | ||
| :yields: None | ||
| During the context, the specified modules are added to the sys.modules | ||
| dictionary as instances of the ModuleType class. | ||
| This effectively mocks the modules, allowing them to be imported and used | ||
| within the context. After the context, the mocked modules are removed | ||
| from the sys.modules dictionary. | ||
| Example usage: | ||
| ```python | ||
| with mocked_modules("module1", "module2"): | ||
| # Code that uses the mocked modules | ||
| ``` | ||
| """ | ||
| for module_name in module_names: | ||
| sys.modules[module_name] = Mock() | ||
| yield | ||
| for module_name in module_names: | ||
| sys.modules.pop(module_name) | ||
|
|
||
|
|
||
| def import_file_as_module(relative_script_path): # type:(str) -> ModuleType | ||
| """Import a Python script without the .py extension as a python module. | ||
| :param relative_script_path (str): The relative path of the script to import. | ||
| :returns module: The imported module. | ||
| :raises: AssertionError: If the spec or loader is not available. | ||
| Note: | ||
| - This function uses different methods depending on the Python version. | ||
| - For Python 2, it uses the imp module. | ||
| - For Python 3, it uses the importlib module. | ||
| Example: | ||
| - import_script_as_module('scripts/mail-alarm') # Returns the imported module. | ||
| """ | ||
| script_path = os.path.dirname(__file__) + "/../../" + relative_script_path | ||
| module_name = os.path.basename(script_path.replace(".py", "")) | ||
|
|
||
| if sys.version_info.major == 2: | ||
| # Use imp only for python2.7, there is no alternative to it: | ||
| # pylint: disable-next=import-outside-toplevel, deprecated-module | ||
| import imp # pyright: ignore[reportMissingImports] | ||
|
|
||
| return imp.load_source(module_name, script_path) | ||
|
|
||
| # For Python 3.11+: Import Python script without the .py extension: | ||
| # https://gist.github.com/bernhardkaindl/1aaa04ea925fdc36c40d031491957fd3: | ||
| # pylint: disable-next=import-outside-toplevel | ||
| from importlib import ( # pylint: disable=no-name-in-module | ||
| machinery, | ||
| util, | ||
| ) | ||
|
|
||
| loader = machinery.SourceFileLoader(module_name, script_path) | ||
| spec = util.spec_from_loader(module_name, loader) | ||
| assert spec | ||
| assert spec.loader | ||
| module = util.module_from_spec(spec) | ||
| sys.modules[module_name] = module | ||
| spec.loader.exec_module(module) | ||
| return module | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| """rootless_container.py: Create a rootless container on any Linux and GitHub CI""" | ||
| import ctypes | ||
| import os | ||
|
|
||
| # Unshare the user namespace, so that the calling process is moved into a new | ||
| # user namespace which is not shared with any previously existing process. | ||
| # Needed so that the current user id can be mapped to 0 for getting a new | ||
| # mount namespace. | ||
| CLONE_NEWUSER = 0x10000000 | ||
| # Unshare the mount namespace, so that the calling process has a private copy | ||
| # of its root directory namespace which is not shared with any other process: | ||
| CLONE_NEWNS = 0x00020000 | ||
| # Flags for mount(2): | ||
| MS_BIND = 4096 | ||
| MS_REC = 16384 | ||
| MS_PRIVATE = 1 << 18 | ||
|
|
||
|
|
||
| def unshare(flags): # type:(int) -> None | ||
| """Wrapper for the library call to unshare Linux kernel namespaces""" | ||
| lib = ctypes.CDLL(None, use_errno=True) | ||
| lib.unshare.argtypes = [ctypes.c_int] | ||
| rc = lib.unshare(flags) | ||
| if rc != 0: # pragma: no cover | ||
| errno = ctypes.get_errno() | ||
| raise OSError(errno, os.strerror(errno), flags) | ||
|
|
||
|
|
||
| def mount(source="none", target="", fs="", flags=0, options=""): | ||
| # type:(str, str, str, int, str) -> None | ||
| """Wrapper for the library call mount(). Supports Python2.7 and Python3.x""" | ||
| lib = ctypes.CDLL(None, use_errno=True) | ||
| lib.mount.argtypes = ( | ||
| ctypes.c_char_p, | ||
| ctypes.c_char_p, | ||
| ctypes.c_char_p, | ||
| ctypes.c_ulong, | ||
| ctypes.c_char_p, | ||
| ) | ||
| result = lib.mount( | ||
| source.encode(), target.encode(), fs.encode(), flags, options.encode() | ||
| ) | ||
| if result < 0: # pragma: no cover | ||
| errno = ctypes.get_errno() | ||
| raise OSError( | ||
| errno, | ||
| "mount " + target + " (" + options + "): " + os.strerror(errno), | ||
| ) | ||
|
|
||
|
|
||
| def umount(target): # type:(str) -> None | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this function is not called, did you intention to call it automatically when leave the |
||
| """Wrapper for the Linux umount system call, supports Python2.7 and Python3.x""" | ||
| lib = ctypes.CDLL(None, use_errno=True) | ||
| result = lib.umount(ctypes.c_char_p(target.encode())) | ||
| if result < 0: # pragma: no cover | ||
| errno = ctypes.get_errno() | ||
| raise OSError(errno, "umount " + target + ": " + os.strerror(errno)) | ||
|
|
||
|
|
||
| def enter_private_mount_namespace(): | ||
| """Enter a private mount and user namespace with the user and simulate uid 0 | ||
| Some code like mount() requires to be run as root. The container simulates | ||
| root-like privileges and a new mount namespace that allows mount() in it. | ||
| Implements the equivalent of `/usr/bin/unshare --map-root-user --mount` | ||
| """ | ||
|
|
||
| # Read the actual user and group ids before entering the new user namespace: | ||
| real_uid = os.getuid() | ||
| real_gid = os.getgid() | ||
| unshare(CLONE_NEWUSER | CLONE_NEWNS) | ||
| # Setup user map to map the user id to behave like uid 0: | ||
| with open("/proc/self/uid_map", "wb") as proc_self_user_map: | ||
| proc_self_user_map.write(b"0 %d 1" % real_uid) | ||
| with open("/proc/self/setgroups", "wb") as proc_self_set_groups: | ||
| proc_self_set_groups.write(b"deny") | ||
| # Setup group map for the user's gid to behave like gid 0: | ||
| with open("/proc/self/gid_map", "wb") as proc_self_group_map: | ||
| proc_self_group_map.write(b"0 %d 1" % real_gid) | ||
| # Private root mount in the mount namespace top support mounting a private tmpfs: | ||
| mount(target="/", flags=MS_REC | MS_PRIVATE) | ||
| return True | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| """scripts/unit_test/test_usb_reset_mount.py: Test usb_reset.mount and .umount""" | ||
| from __future__ import print_function | ||
|
|
||
| from .import_helper import import_file_as_module, mocked_modules | ||
|
|
||
|
|
||
| def test_usb_reset_mount_umount(private_mount_namespace): | ||
| """Test usb_reset.mount and .umount""" | ||
| assert private_mount_namespace | ||
| with mocked_modules("xcp", "xcp.logger"): | ||
| usb_reset = import_file_as_module("scripts/usb_reset.py") | ||
| usb_reset.log.error = print | ||
| usb_reset.mount(source="tmpfs", target="/tmp", fs="tmpfs") | ||
| usb_reset.umount("/tmp") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -262,7 +262,7 @@ def setup_cgroup(domid, pid): # type:(str, str) -> None | |
|
|
||
| def mount(source, target, fs, flags=0): | ||
| if ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True | ||
| ).mount(source, target, fs, flags, None) < 0: | ||
| ).mount(source.encode(), target.encode(), fs.encode(), flags, None) < 0: | ||
| log.error("Failed to mount {} ({}) to {} with flags {}: {}". | ||
| format(source, fs, target, flags, | ||
| os.strerror(ctypes.get_errno()))) | ||
|
|
@@ -271,7 +271,7 @@ def mount(source, target, fs, flags=0): | |
|
|
||
| def umount(target): | ||
| if ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True | ||
| ).umount(target) < 0: | ||
|
||
| ).umount(target.encode()) < 0: | ||
| # log and continue | ||
| log.error("Failed to umount {}: {}". | ||
| format(target, os.strerror(ctypes.get_errno()))) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put the module into
sys.modules