Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ jobs:
strategy:
fail-fast: false
matrix:
docker-image: # See https://hub.docker.com/_/python/tags for images
- python:2.7.18-alpine3.11
- python:3.11.4-alpine3.18
container: ${{ matrix.docker-image }}
python-version: ['2.7', '3.11']
steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Setup Python ${{matrix.python-version}} (without a container)
uses: LizardByte/setup-python-action@master
with:
python-version: ${{matrix.python-version}}

- name: Install python 2 dependencies
if: ${{ startsWith(matrix.docker-image, 'python:2.7.18') }}
if: ${{ matrix.python-version == '2.7' }}
run: pip install enum

- name: Install dependencies
Expand Down
8 changes: 8 additions & 0 deletions .pylintrc.project-dict.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
getgid
getuid
NEWNS
pytest
pytype
setgroups
strerror
tmpfs
4 changes: 4 additions & 0 deletions .sourcery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
rule_settings:
disable:
- use-fstring-for-concatenation
- use-fstring-for-formatting
Empty file added scripts/unit_test/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions scripts/unit_test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""scripts/unit_test/conftest.py: Common pytest module for shared pytest fixtures"""
import pytest

from .rootless_container import enter_private_mount_namespace


@pytest.fixture(scope="session")
def private_mount_namespace():
"""Enter a private mount namespace that allows us to test mount and unmount"""
return enter_private_mount_namespace()
79 changes: 79 additions & 0 deletions scripts/unit_test/import_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""helpers for unit-testing functions in scripts without permanent global mocks"""
import os
import sys
from contextlib import contextmanager
from types import ModuleType

from mock import Mock

if sys.version_info >= (3,):
from typing import Generator


@contextmanager
def mocked_modules(*module_names): # type:(str) -> Generator[None, None, None]
"""Context manager that temporarily mocks the specified modules.
:param module_names: Variable number of names of the modules to be mocked.
:yields: None
During the context, the specified modules are added to the sys.modules
dictionary as instances of the ModuleType class.
This effectively mocks the modules, allowing them to be imported and used
within the context. After the context, the mocked modules are removed
from the sys.modules dictionary.
Example usage:
```python
with mocked_modules("module1", "module2"):
# Code that uses the mocked modules
```
"""
for module_name in module_names:
sys.modules[module_name] = Mock()
yield
for module_name in module_names:
sys.modules.pop(module_name)


def import_file_as_module(relative_script_path): # type:(str) -> ModuleType
"""Import a Python script without the .py extension as a python module.
:param relative_script_path (str): The relative path of the script to import.
:returns module: The imported module.
:raises: AssertionError: If the spec or loader is not available.
Note:
- This function uses different methods depending on the Python version.
- For Python 2, it uses the imp module.
- For Python 3, it uses the importlib module.
Example:
- import_script_as_module('scripts/mail-alarm') # Returns the imported module.
"""
script_path = os.path.dirname(__file__) + "/../../" + relative_script_path
module_name = os.path.basename(script_path.replace(".py", ""))

if sys.version_info.major == 2:
# Use imp only for python2.7, there is no alternative to it:
# pylint: disable-next=import-outside-toplevel, deprecated-module
import imp # pyright: ignore[reportMissingImports]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not put the module into sys.modules

return imp.load_source(module_name, script_path)

# For Python 3.11+: Import Python script without the .py extension:
# https://gist.github.com/bernhardkaindl/1aaa04ea925fdc36c40d031491957fd3:
# pylint: disable-next=import-outside-toplevel
from importlib import ( # pylint: disable=no-name-in-module
machinery,
util,
)

loader = machinery.SourceFileLoader(module_name, script_path)
spec = util.spec_from_loader(module_name, loader)
assert spec
assert spec.loader
module = util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
83 changes: 83 additions & 0 deletions scripts/unit_test/rootless_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""rootless_container.py: Create a rootless container on any Linux and GitHub CI"""
import ctypes
import os

# Unshare the user namespace, so that the calling process is moved into a new
# user namespace which is not shared with any previously existing process.
# Needed so that the current user id can be mapped to 0 for getting a new
# mount namespace.
CLONE_NEWUSER = 0x10000000
# Unshare the mount namespace, so that the calling process has a private copy
# of its root directory namespace which is not shared with any other process:
CLONE_NEWNS = 0x00020000
# Flags for mount(2):
MS_BIND = 4096
MS_REC = 16384
MS_PRIVATE = 1 << 18


def unshare(flags): # type:(int) -> None
"""Wrapper for the library call to unshare Linux kernel namespaces"""
lib = ctypes.CDLL(None, use_errno=True)
lib.unshare.argtypes = [ctypes.c_int]
rc = lib.unshare(flags)
if rc != 0: # pragma: no cover
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno), flags)


def mount(source="none", target="", fs="", flags=0, options=""):
# type:(str, str, str, int, str) -> None
"""Wrapper for the library call mount(). Supports Python2.7 and Python3.x"""
lib = ctypes.CDLL(None, use_errno=True)
lib.mount.argtypes = (
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_char_p,
)
result = lib.mount(
source.encode(), target.encode(), fs.encode(), flags, options.encode()
)
if result < 0: # pragma: no cover
errno = ctypes.get_errno()
raise OSError(
errno,
"mount " + target + " (" + options + "): " + os.strerror(errno),
)


def umount(target): # type:(str) -> None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is not called, did you intention to call it automatically when leave the session?

"""Wrapper for the Linux umount system call, supports Python2.7 and Python3.x"""
lib = ctypes.CDLL(None, use_errno=True)
result = lib.umount(ctypes.c_char_p(target.encode()))
if result < 0: # pragma: no cover
errno = ctypes.get_errno()
raise OSError(errno, "umount " + target + ": " + os.strerror(errno))


def enter_private_mount_namespace():
"""Enter a private mount and user namespace with the user and simulate uid 0
Some code like mount() requires to be run as root. The container simulates
root-like privileges and a new mount namespace that allows mount() in it.
Implements the equivalent of `/usr/bin/unshare --map-root-user --mount`
"""

# Read the actual user and group ids before entering the new user namespace:
real_uid = os.getuid()
real_gid = os.getgid()
unshare(CLONE_NEWUSER | CLONE_NEWNS)
# Setup user map to map the user id to behave like uid 0:
with open("/proc/self/uid_map", "wb") as proc_self_user_map:
proc_self_user_map.write(b"0 %d 1" % real_uid)
with open("/proc/self/setgroups", "wb") as proc_self_set_groups:
proc_self_set_groups.write(b"deny")
# Setup group map for the user's gid to behave like gid 0:
with open("/proc/self/gid_map", "wb") as proc_self_group_map:
proc_self_group_map.write(b"0 %d 1" % real_gid)
# Private root mount in the mount namespace top support mounting a private tmpfs:
mount(target="/", flags=MS_REC | MS_PRIVATE)
return True
14 changes: 14 additions & 0 deletions scripts/unit_test/test_usb_reset_mount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""scripts/unit_test/test_usb_reset_mount.py: Test usb_reset.mount and .umount"""
from __future__ import print_function

from .import_helper import import_file_as_module, mocked_modules


def test_usb_reset_mount_umount(private_mount_namespace):
"""Test usb_reset.mount and .umount"""
assert private_mount_namespace
with mocked_modules("xcp", "xcp.logger"):
usb_reset = import_file_as_module("scripts/usb_reset.py")
usb_reset.log.error = print
usb_reset.mount(source="tmpfs", target="/tmp", fs="tmpfs")
usb_reset.umount("/tmp")
4 changes: 2 additions & 2 deletions scripts/usb_reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def setup_cgroup(domid, pid): # type:(str, str) -> None

def mount(source, target, fs, flags=0):
if ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True
).mount(source, target, fs, flags, None) < 0:
).mount(source.encode(), target.encode(), fs.encode(), flags, None) < 0:
log.error("Failed to mount {} ({}) to {} with flags {}: {}".
format(source, fs, target, flags,
os.strerror(ctypes.get_errno())))
Expand All @@ -271,7 +271,7 @@ def mount(source, target, fs, flags=0):

def umount(target):
if ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True
).umount(target) < 0:
Copy link
Contributor

@acefei acefei Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might consider encapsulating these two functions as the following pseudo code.
The Mounter class encapsulates the loading of libc and the management of mount/umount methods, that could enhance the code's reusability and won't need to loading libc repeatly and implementing the enter and exit methods, enabling the Mounter object to be used within a with statement.

import ctypes
import os
import logging

class Mounter:
    def __init__(self):
        self.libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
        self.target = None

    def mount(self, source, target, fs, flags=0):
        if self.libc.mount(source.encode(), target.encode(), fs.encode(), flags, None) < 0:
            error_message = "Failed to mount {} ({}) to {} with flags {}: {}".format(
                source, fs, target, flags, os.strerror(ctypes.get_errno())
            )
            logging.error(error_message)
            return error_message
        else:
            self.target = target
            return "Mount successful"

    def umount(self, target, flags=0):
        if self.libc.umount2(target.encode(), flags) < 0:
            error_message = "Failed to unmount {} with flags {}: {}".format(
                target, flags, os.strerror(ctypes.get_errno())
            )
            logging.error(error_message)
            return error_message
        else:
            return "Unmount successful"
    
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if self.target is not None:
            self.umount(self.target)

BTW, as libc manual mentioned https://www.gnu.org/software/libc/manual/html_node/Mount_002dUnmount_002dRemount.html

umount does the same thing as umount2 with flags set to zeroes. It is more widely available than umount2 but since it lacks the possibility to forcefully unmount a filesystem is deprecated when umount2 is also available.

Should we consider using umount2 instead?

).umount(target.encode()) < 0:
# log and continue
log.error("Failed to umount {}: {}".
format(target, os.strerror(ctypes.get_errno())))
Expand Down