From 770282b05dd1f0958e600db963c3c4bce07dd9e7 Mon Sep 17 00:00:00 2001 From: Weii Wang Date: Wed, 12 Jul 2023 09:40:19 +0800 Subject: [PATCH] Use real filesystem in ops.testing backend (#960) This pull request refactors the testing module of the Ops library, switching its backend from an in-memory virtual filesystem to a real filesystem for simulating filesystem operations within containers. Furthermore, it exposes a new API, `Harness.get_filesystem_root`, enabling developers to interact with the simulated filesystem during test cases for both populating and inspecting the testing container filesystem. --- ops/testing.py | 624 ++++++++++++------------------------------- test/test_testing.py | 424 ++++++++++++++--------------- 2 files changed, 363 insertions(+), 685 deletions(-) diff --git a/ops/testing.py b/ops/testing.py index d7f2a1e4a..a290be1ef 100755 --- a/ops/testing.py +++ b/ops/testing.py @@ -23,12 +23,13 @@ import os import pathlib import random +import shutil import signal import tempfile import uuid import warnings from contextlib import contextmanager -from io import BytesIO, StringIO +from io import BytesIO, IOBase, StringIO from textwrap import dedent from typing import ( TYPE_CHECKING, @@ -38,7 +39,6 @@ Dict, Generic, Iterable, - Iterator, List, Literal, Mapping, @@ -55,7 +55,7 @@ from ops import charm, framework, model, pebble, storage from ops._private import yaml from ops.charm import CharmBase, CharmMeta, RelationRole -from ops.model import RelationNotFoundError +from ops.model import Container, RelationNotFoundError if TYPE_CHECKING: from typing_extensions import TypedDict @@ -64,7 +64,6 @@ ReadableBuffer = Union[bytes, str, StringIO, BytesIO, BinaryIO] _StringOrPath = Union[str, pathlib.PurePosixPath, pathlib.Path] - _FileOrDir = Union['_File', '_Directory'] _FileKwargs = TypedDict('_FileKwargs', { 'permissions': Optional[int], 'last_modified': datetime.datetime, @@ -552,6 +551,9 @@ def add_storage(self, storage_name: str, count: int = 1, location set to /[tmpdir]/N, where N is the counter and will be a number from [0,total_num_disks-1]. + The test harness uses symbolic links to imitate storage mounts, which may lead to some + inconsistencies compared to the actual charm. + Args: storage_name: The storage backend name on the Charm count: Number of disks being added @@ -604,6 +606,10 @@ def attach_storage(self, storage_id: str) -> None: It will trigger a storage-attached hook if the storage unit in question exists and is presently marked as detached. + The test harness uses symbolic links to imitate storage mounts, which may lead to some + inconsistencies compared to the actual charm. Users should be cognizant of + this potential discrepancy. + Args: storage_id: The full storage ID of the storage unit being attached, including the storage key, e.g. my-storage/0. @@ -1429,6 +1435,55 @@ def trigger_secret_expiration(self, secret_id: str, revision: int, *, label = secret.label self.charm.on.secret_expired.emit(secret_id, label, revision) + def get_filesystem_root(self, container: Union[str, Container]) -> pathlib.Path: + """Return the temp directory path harness will use to simulate the container filesystem. + + In a real container runtime, each container has an isolated root filesystem. + To simulate this behaviour, the testing harness manages a temporary directory for + each container. Any Pebble filesystem API calls will be translated + and mapped to this directory, as if the directory was the container's + filesystem root. + + This process is quite similar to the ``chroot`` command. Charm tests should + treat the returned directory as the container's root directory (``/``). + The testing harness will not create any files or directories inside the + simulated container's root directory; it's up to the test to populate the container's + root directory with any files or directories the charm needs. + + Regarding the file ownership: unprivileged users are unable to create files with distinct + ownership. To circumvent this limitation, the testing harness maps all user and group + options related to file operations to match the current user and group. + + Example usage:: + + # charm.py + import ops + class ExampleCharm(ops.CharmBase): + def __init__(self, *args): + super().__init__(*args) + self.hostname = open("/etc/hostname").read() + + # test_charm.py + from ops.testing import Harness + harness = Harness(ExampleCharm) + root = harness.get_filesystem_root("mycontainer") + (root / "etc" / "hostname").write_text("hostname.example.com") + harness.begin() + + Args: + container: The name of the container or the container instance. + + Return: + The path of the temporary directory associated with the specified container. + """ + # it's okay to access the container directly in this context, as its creation has already + # been ensured during the model's initialization. + if isinstance(container, str): + container_name = container + else: + container_name = container.name + return self._backend._pebble_clients[container_name]._root + def _get_app_or_unit_name(app_or_unit: AppUnitOrName) -> str: """Return name of given application or unit (return strings directly).""" @@ -1601,6 +1656,10 @@ def __init__(self, unit_name: str, meta: charm.CharmMeta, config: 'RawConfig'): self.model_uuid = str(uuid.uuid4()) self._harness_tmp_dir = tempfile.TemporaryDirectory(prefix='ops-harness-') + self._harness_storage_path = pathlib.Path(self._harness_tmp_dir.name) / "storages" + self._harness_container_path = pathlib.Path(self._harness_tmp_dir.name) / "containers" + self._harness_storage_path.mkdir() + self._harness_container_path.mkdir() # this is used by the _record_calls decorator self._calls: List[Tuple[Any, ...]] = [] self._meta = meta @@ -1836,7 +1895,7 @@ def storage_add(self, name: str, count: int = 1) -> List[int]: index = self._storage_index_counter self._storage_index_counter += 1 self._storage_list[name][index] = { - 'location': os.path.join(self._harness_tmp_dir.name, name, str(index)), + 'location': os.path.join(self._harness_storage_path, name, str(index)), } result.append(index) return result @@ -1847,8 +1906,12 @@ def _storage_detach(self, storage_id: str): name, index = storage_id.split('/', 1) index = int(index) - for client in self._pebble_clients.values(): - client._fs.remove_mount(name) + for container, client in self._pebble_clients.items(): + for _, mount in self._meta.containers[container].mounts.items(): + if mount.storage != name: + continue + root = client._root + (root / mount.location[1:]).unlink() if self._storage_is_attached(name, index): self._storage_attached[name].remove(index) @@ -1865,8 +1928,12 @@ def _storage_attach(self, storage_id: str): if mount.storage != name: continue for index, store in self._storage_list[mount.storage].items(): - fs = client._fs - fs.add_mount(mount.storage, mount.location, store['location']) + root = client._root + mounting_dir = root / mount.location[1:] + mounting_dir.parent.mkdir(parents=True, exist_ok=True) + target_dir = pathlib.Path(store["location"]) + target_dir.mkdir(parents=True, exist_ok=True) + mounting_dir.symlink_to(target_dir) index = int(index) if not self._storage_is_attached(name, index): @@ -1927,8 +1994,9 @@ def get_pebble(self, socket_path: str) -> '_TestingPebbleClient': container = socket_path.split('/')[3] # /charm/containers//pebble.socket client = self._pebble_clients.get(container, None) if client is None: - client = _TestingPebbleClient(self) - self._pebble_clients[container] = client + container_root = self._harness_container_path / container + container_root.mkdir() + client = _TestingPebbleClient(self, container_root=container_root) # we need to know which container a new pebble client belongs to # so we can figure out which storage mounts must be simulated on @@ -2202,12 +2270,12 @@ class _TestingPebbleClient: as the only public methods of this type are for implementing Client. """ - def __init__(self, backend: _TestingModelBackend): + def __init__(self, backend: _TestingModelBackend, container_root: pathlib.Path): self._backend = _TestingModelBackend self._layers: Dict[str, pebble.Layer] = {} # Has a service been started/stopped? self._service_status: Dict[str, pebble.ServiceStatus] = {} - self._fs = _TestingFilesystem() + self._root = container_root self._backend = backend def _check_connection(self): @@ -2411,11 +2479,23 @@ def get_services(self, names: Optional[List[str]] = None) -> List[pebble.Service infos.append(info) return infos + @staticmethod + def _check_absolute_path(path: str): + if not path.startswith("/"): + raise pebble.PathError( + 'generic-file-error', + f'paths must be absolute, got {path!r}' + ) + def pull(self, path: str, *, encoding: str = 'utf-8') -> Union[BinaryIO, TextIO]: self._check_connection() + self._check_absolute_path(path) + file_path = self._root / path[1:] try: - return self._fs.open(path, encoding=encoding) + return cast( + Union[BinaryIO, TextIO], + file_path.open("rb" if encoding is None else "r", encoding=encoding)) except FileNotFoundError: raise pebble.PathError('not-found', f'stat {path}: no such file or directory') except IsADirectoryError: @@ -2434,66 +2514,64 @@ def push( raise pebble.PathError( 'generic-file-error', f'permissions not within 0o000 to 0o777: {permissions:#o}') + self._check_absolute_path(path) + file_path = self._root / path[1:] + if make_dirs and not file_path.parent.exists(): + self.make_dir( + os.path.dirname(path), + make_parents=True, + permissions=None, + user_id=user_id, + user=user, + group_id=group_id, + group=group) + permissions = permissions if permissions is not None else 0o644 try: - self._fs.create_file( - path, source, encoding=encoding, make_dirs=make_dirs, permissions=permissions, - user_id=user_id, user=user, group_id=group_id, group=group) + if isinstance(source, str): + file_path.write_text(source, encoding=encoding) + elif isinstance(source, bytes): + file_path.write_bytes(source) + else: + with file_path.open('wb' if encoding is None else 'w', encoding=encoding) as f: + shutil.copyfileobj(cast(IOBase, source), cast(IOBase, f)) + os.chmod(file_path, permissions) except FileNotFoundError as e: raise pebble.PathError( 'not-found', f'parent directory not found: {e.args[0]}') - except NonAbsolutePathError as e: - raise pebble.PathError( - 'generic-file-error', - f'paths must be absolute, got {e.args[0]!r}' - ) + except NotADirectoryError: + raise pebble.PathError('generic-file-error', + f'open {path}.~: not a directory') def list_files(self, path: str, *, pattern: Optional[str] = None, itself: bool = False) -> List[pebble.FileInfo]: self._check_connection() - try: - files = [self._fs.get_path(path)] - except FileNotFoundError: - # conform with the real pebble api + self._check_absolute_path(path) + file_path = self._root / path[1:] + if not file_path.exists(): raise pebble.APIError( body={}, code=404, status='Not Found', message=f"stat {path}: no such file or directory") - + files = [file_path] if not itself: try: - files = self._fs.list_dir(path) + files = [file_path / file for file in os.listdir(file_path)] except NotADirectoryError: pass if pattern is not None: files = [file for file in files if fnmatch.fnmatch(file.name, pattern)] - type_mappings = { - _File: pebble.FileType.FILE, - _Directory: pebble.FileType.DIRECTORY, - } - - def get_pebble_file_type(file: '_FileOrDir') -> pebble.FileType: - pebble_type = type_mappings.get(type(file)) - if not pebble_type: - raise ValueError( - f'unable to convert file {file} (type not one of {type_mappings})') - return pebble_type - - return [ - pebble.FileInfo( - path=str(file.path), - name=file.name, - type=get_pebble_file_type(file), - size=file.size if isinstance(file, _File) else None, - permissions=file.kwargs.get('permissions') or 0, - last_modified=file.last_modified, - user_id=file.kwargs.get('user_id'), - user=file.kwargs.get('user'), - group_id=file.kwargs.get('group_id'), - group=file.kwargs.get('group'), - ) + file_infos = [ + Container._build_fileinfo(file) for file in files ] + for file_info in file_infos: + rel_path = os.path.relpath(file_info.path, start=self._root) + rel_path = '/' if rel_path == '.' else '/' + rel_path + file_info.path = rel_path + if rel_path == "/": + file_info.name = "/" + return file_infos def make_dir( self, path: str, *, @@ -2509,38 +2587,52 @@ def make_dir( raise pebble.PathError( 'generic-file-error', f'permissions not within 0o000 to 0o777: {permissions:#o}') - try: - self._fs.create_dir( - path, make_parents=make_parents, permissions=permissions, - user_id=user_id, user=user, group_id=group_id, group=group) - except FileNotFoundError as e: - # Parent directory doesn't exist and make_parents is False + self._check_absolute_path(path) + dir_path = self._root / path[1:] + if not dir_path.parent.exists() and not make_parents: raise pebble.PathError( - 'not-found', f'parent directory not found: {e.args[0]}') + 'not-found', f'parent directory not found: {path}') + if not dir_path.parent.exists() and make_parents: + self.make_dir( + os.path.dirname(path), + make_parents=True, + permissions=permissions, + user_id=user_id, + user=user, + group_id=group_id, + group=group) + try: + permissions = permissions if permissions else 0o755 + dir_path.mkdir() + os.chmod(dir_path, permissions) + except FileExistsError: + if not make_parents: + raise pebble.PathError('generic-file-error', f'mkdir {path}: file exists') except NotADirectoryError as e: # Attempted to create a subdirectory of a file raise pebble.PathError('generic-file-error', f'not a directory: {e.args[0]}') - except NonAbsolutePathError as e: - raise pebble.PathError( - 'generic-file-error', - f'paths must be absolute, got {e.args[0]!r}' - ) def remove_path(self, path: str, *, recursive: bool = False): self._check_connection() - try: - file_or_dir = self._fs.get_path(path) - except FileNotFoundError: + self._check_absolute_path(path) + file_path = self._root / path[1:] + if not file_path.exists(): if recursive: - # Pebble doesn't give not-found error when recursive is specified return raise pebble.PathError( 'not-found', f'remove {path}: no such file or directory') - - if isinstance(file_or_dir, _Directory) and len(file_or_dir) > 0 and not recursive: - raise pebble.PathError( - 'generic-file-error', 'cannot remove non-empty directory without recursive=True') - self._fs.delete_path(path) + if file_path.is_dir(): + if recursive: + shutil.rmtree(file_path) + else: + try: + file_path.rmdir() + except OSError: + raise pebble.PathError( + 'generic-file-error', + 'cannot remove non-empty directory without recursive=True') + else: + file_path.unlink() def exec(self, command, **kwargs): # type:ignore raise NotImplementedError(self.exec) # type:ignore @@ -2584,387 +2676,3 @@ def send_signal(self, sig: Union[int, str], service_names: Iterable[str]): def get_checks(self, level=None, names=None): # type:ignore raise NotImplementedError(self.get_checks) # type:ignore - - -class NonAbsolutePathError(Exception): - """Error raised by _TestingFilesystem. - - This error is raised when an absolute path is required but the code instead encountered a - relative path. - """ - - -class _TestingStorageMount: - """Simulates a filesystem backend for storage mounts.""" - - def __init__(self, location: pathlib.PurePosixPath, src: pathlib.Path): - """Creates a new simulated storage mount. - - Args: - location: The path within simulated filesystem at which this storage will be mounted. - src: The temporary on-disk location where the simulated storage will live. - """ - self._src = src - self._location = location - - src.mkdir(exist_ok=True, parents=True) - - def contains(self, path: '_StringOrPath') -> bool: - """Returns true whether path resides within this simulated storage mount's location.""" - try: - pathlib.PurePosixPath(path).relative_to(self._location) - return True - except Exception: - return False - - def check_contains(self, path: '_StringOrPath') -> pathlib.PurePosixPath: - """Raises if path does not reside within this simulated storage mount's location.""" - if not self.contains(path): - msg = 'the provided path "{!s}" does not reside within the mount location "{!s}"' \ - .format(path, self._location) - raise RuntimeError(msg) - return pathlib.PurePosixPath(path) - - def _srcpath(self, path: pathlib.PurePosixPath) -> pathlib.Path: - """Returns the disk-backed path where the simulated path will actually be stored.""" - suffix = path.relative_to(self._location) - return self._src / suffix - - def create_dir( - self, - path: '_StringOrPath', - make_parents: bool = False, - **kwargs: Any) -> '_Directory': - if not pathlib.PurePosixPath(path).is_absolute(): - raise NonAbsolutePathError(str(path)) - path = self.check_contains(path) - srcpath = self._srcpath(path) - - if srcpath.exists() and srcpath.is_dir() and make_parents: - return _Directory(path, **kwargs) # nothing to do - if srcpath.exists(): - raise FileExistsError(str(path)) - - dirname = srcpath.parent - if not dirname.exists(): - if not make_parents: - raise FileNotFoundError(str(path.parent)) - dirname.mkdir(parents=True, exist_ok=True) - srcpath.mkdir(exist_ok=True) - return _Directory(path, **kwargs) - - def create_file( - self, - path: '_StringOrPath', - data: 'ReadableBuffer', - encoding: str = 'utf-8', - make_dirs: bool = False, - **kwargs: Any - ) -> '_File': - posixpath: pathlib.PurePosixPath = self.check_contains(path) - srcpath = self._srcpath(posixpath) - - dirname = srcpath.parent - if not dirname.exists(): - if not make_dirs: - raise FileNotFoundError(str(posixpath.parent)) - dirname.mkdir(parents=True, exist_ok=True) - - if isinstance(data, str): - data = data.encode(encoding=encoding) - elif isinstance(data, (StringIO, BytesIO)): - data = data.getvalue() - if isinstance(data, str): - data = data.encode() - - byte_data = cast(bytes, data) - - with srcpath.open('wb') as f: - f.write(byte_data) - - return _File(posixpath, byte_data, encoding=encoding, **kwargs) - - def list_dir(self, path: '_StringOrPath') -> List['_FileOrDir']: - _path = self.check_contains(path) - srcpath = self._srcpath(_path) - - results: List[_FileOrDir] = [] - if not srcpath.exists(): - raise FileNotFoundError(str(_path)) - if not srcpath.is_dir(): - raise NotADirectoryError(str(_path)) - for fpath in srcpath.iterdir(): - mountpath = _path / fpath.name - if fpath.is_dir(): - results.append(_Directory(mountpath)) - elif fpath.is_file(): - with fpath.open('rb') as f: - results.append(_File(mountpath, f.read())) - else: - raise RuntimeError(f'unsupported file type at path {fpath}') - return results - - def open( - self, - path: '_StringOrPath', - encoding: Optional[str] = 'utf-8', - ) -> Union[BinaryIO, TextIO]: - path = self.check_contains(path) - - file = self.get_path(path) - if isinstance(file, _Directory): - raise IsADirectoryError(str(file.path)) - return file.open(encoding=encoding) - - def get_path(self, path: '_StringOrPath') -> '_FileOrDir': - path = self.check_contains(path) - srcpath = self._srcpath(path) - if srcpath.is_dir(): - return _Directory(path) - if not srcpath.exists(): - raise FileNotFoundError(str(path)) - with srcpath.open('rb') as f: - return _File(path, f.read()) - - def delete_path(self, path: '_StringOrPath') -> None: - path = self.check_contains(path) - srcpath = self._srcpath(path) - if srcpath.exists(): - srcpath.unlink() - else: - raise FileNotFoundError(str(path)) - - -class _TestingFilesystem: - r"""An in-memory mock of a pebble-controlled container's filesystem. - - For now, the filesystem is assumed to be a POSIX-style filesystem; Windows-style directories - (e.g. \, \foo\bar, C:\foo\bar) are not supported. - """ - - def __init__(self): - self.root = _Directory(pathlib.PurePosixPath('/')) - self._mounts: Dict[str, _TestingStorageMount] = {} - - def add_mount(self, name: str, mount_path: Union[str, pathlib.Path], - backing_src_path: Union[str, pathlib.Path]): - self._mounts[name] = _TestingStorageMount( - pathlib.PurePosixPath(mount_path), pathlib.Path(backing_src_path)) - - def remove_mount(self, name: str): - if name in self._mounts: - del self._mounts[name] - - def create_dir(self, path: str, make_parents: bool = False, **kwargs: Any) -> '_Directory': - if not path.startswith('/'): - raise NonAbsolutePathError(path) - for mount in self._mounts.values(): - if mount.contains(path): - return mount.create_dir(path, make_parents, **kwargs) - current_dir = self.root - tokens = pathlib.PurePosixPath(path).parts[1:] - for token in tokens[:-1]: - if token in current_dir: - current_dir = current_dir[token] - else: - if make_parents: - # NOTE: other parameters (e.g. ownership, permissions) only get applied to the - # final directory. - # (At the time of writing, Pebble defaults to 0o755 permissions and root:root - # ownership.) - current_dir = current_dir.create_dir(token) - else: - raise FileNotFoundError(str(current_dir.path / token)) - if isinstance(current_dir, _File): - raise NotADirectoryError(str(current_dir.path)) - - # Current backend will always raise an error if the final directory component - # already exists. - token = tokens[-1] - if token not in current_dir: - current_dir = current_dir.create_dir(token, **kwargs) - else: - # If 'make_parents' is specified, behave like 'mkdir -p' and ignore if the dir already - # exists. - if make_parents: - current_dir = _Directory(current_dir.path / token) - else: - raise FileExistsError(str(current_dir.path / token)) - return current_dir - - def create_file( - self, - path: str, - data: 'ReadableBuffer', - encoding: str = 'utf-8', - make_dirs: bool = False, - **kwargs: Any - ) -> '_File': - if not path.startswith('/'): - raise NonAbsolutePathError(path) - for mount in self._mounts.values(): - if mount.contains(path): - return mount.create_file(path, data, encoding, make_dirs, **kwargs) - path_obj = pathlib.PurePosixPath(path) - try: - dir_ = self.get_path(path_obj.parent) - except FileNotFoundError: - if make_dirs: - dir_ = self.create_dir(str(path_obj.parent), make_parents=make_dirs) - # NOTE: other parameters (e.g. ownership, permissions) only get applied to the - # final directory. - # (At the time of writing, Pebble defaults to the specified permissions and - # root:root ownership, which is inconsistent with the push function's - # behavior for parent directories.) - else: - raise - if not isinstance(dir_, _Directory): - raise pebble.PathError( - 'generic-file-error', f'parent is not a directory: {str(dir_)}') - return dir_.create_file(path_obj.name, data, encoding=encoding, **kwargs) - - def list_dir(self, path: '_StringOrPath') -> List['_FileOrDir']: - for mount in self._mounts.values(): - if mount.contains(path): - return mount.list_dir(path) - current_dir = self.root - tokens = pathlib.PurePosixPath(path).parts[1:] - for token in tokens: - try: - current_dir = current_dir[token] - except KeyError: - raise FileNotFoundError(str(current_dir.path / token)) - if isinstance(current_dir, _File): - raise NotADirectoryError(str(current_dir.path)) - if not isinstance(current_dir, _Directory): # type: ignore - # For now, ignoring other possible cases besides File and Directory (e.g. Symlink). - raise NotImplementedError() - return list(current_dir) - - def open( - self, - path: '_StringOrPath', - encoding: Optional[str] = 'utf-8', - ) -> Union[BinaryIO, TextIO]: - for mount in self._mounts.values(): - if mount.contains(path): - return mount.open(path, encoding) - path = pathlib.PurePosixPath(path) - file = self.get_path(path) # warning: no check re: directories - if isinstance(file, _Directory): - raise IsADirectoryError(str(file.path)) - return file.open(encoding=encoding) - - def get_path(self, path: '_StringOrPath') -> '_FileOrDir': - for mount in self._mounts.values(): - if mount.contains(path): - return mount.get_path(path) - path = pathlib.PurePosixPath(path) - tokens = path.parts[1:] - current_object = self.root - for token in tokens: - # ASSUMPTION / TESTME: object might be file - if isinstance(current_object, _File): - raise RuntimeError('cannot expand path {!r} from {!r}: ' - 'not a directory'.format(token, current_object)) - if token in current_object: - current_object = current_object[token] - else: - raise FileNotFoundError(str(current_object.path / token)) - return current_object - - def delete_path(self, path: '_StringOrPath') -> None: - for mount in self._mounts.values(): - if mount.contains(path): - return mount.delete_path(path) - path = pathlib.PurePosixPath(path) - parent_dir = self.get_path(path.parent) - if not isinstance(parent_dir, _Directory): - raise RuntimeError('cannot delete {}: parent {!r}' - 'is not a directory'.format(path.name, parent_dir)) - del parent_dir[path.name] - - -class _Directory: - def __init__(self, path: pathlib.PurePosixPath, **kwargs: Any): - self.path = path - self._children: Dict[str, Union[_Directory, _File]] = {} - self.last_modified = datetime.datetime.now() - self.kwargs = cast('_FileKwargs', kwargs) - - @property - def name(self) -> str: - # Need to handle special case for root. - # pathlib.PurePosixPath('/').name is '', but pebble returns '/'. - return self.path.name if self.path.name else '/' - - def __contains__(self, child: str) -> bool: - return child in self._children - - def __iter__(self) -> Iterator['_FileOrDir']: - return (value for value in self._children.values()) - - def __getitem__(self, key: str) -> '_FileOrDir': - return self._children[key] - - def __delitem__(self, key: str) -> None: - try: - del self._children[key] - except KeyError: - raise FileNotFoundError(str(self.path / key)) - - def __len__(self): - return len(self._children) - - def create_dir(self, name: str, **kwargs: Any) -> '_Directory': - dirc = _Directory(self.path / name, **kwargs) - self._children[name] = dirc - return dirc - - def create_file( - self, - name: str, - data: 'ReadableBuffer', - encoding: Optional[str] = 'utf-8', - **kwargs: Any - ) -> '_File': - file = _File(self.path / name, data, encoding=encoding, **kwargs) - self._children[name] = file - return file - - -class _File: - def __init__( - self, - path: pathlib.PurePosixPath, - data: 'ReadableBuffer', - encoding: Optional[str] = 'utf-8', - **kwargs: Any): - - if hasattr(data, 'read'): # if BytesIO/StringIO: - data = data.read() # type: ignore - if isinstance(data, str): # if str/StringIO - data = data.encode(encoding) # type: ignore - - byte_data = cast(bytes, data) # it's bytes by now; pyright doesn't like redeclaring vars - data_size = len(byte_data) - - self.path = path - self.data = byte_data - self.size = data_size - self.last_modified = datetime.datetime.now() - self.kwargs = cast('_FileKwargs', kwargs) - - @property - def name(self) -> str: - return self.path.name - - def open( - self, - encoding: Optional[str] = 'utf-8', - ) -> Union[TextIO, BinaryIO]: - if encoding is None: - return BytesIO(self.data) - else: - raw = self.data.decode(encoding) - return StringIO(raw) diff --git a/test/test_testing.py b/test/test_testing.py index 7cd12194f..cdf76c2ce 100644 --- a/test/test_testing.py +++ b/test/test_testing.py @@ -14,6 +14,7 @@ import collections import datetime +import grp import importlib import inspect import io @@ -21,13 +22,13 @@ import os import pathlib import platform +import pwd import shutil import sys import tempfile import textwrap import unittest import uuid -from io import BytesIO, StringIO from unittest.mock import MagicMock import pytest @@ -37,13 +38,8 @@ import ops.testing from ops import pebble from ops.model import _ModelBackend -from ops.testing import ( - NonAbsolutePathError, - _Directory, - _TestingFilesystem, - _TestingPebbleClient, - _TestingStorageMount, -) +from ops.pebble import FileType +from ops.testing import _TestingPebbleClient is_linux = platform.system() == 'Linux' @@ -2681,6 +2677,20 @@ def test_container_pebble_ready(self): ] ) + def test_get_filesystem_root(self): + harness = ops.testing.Harness(ops.CharmBase, meta=''' + name: test-app + containers: + foo: + resource: foo-image + ''') + foo_root = harness.get_filesystem_root("foo") + self.assertTrue(foo_root.exists()) + self.assertTrue(foo_root.is_dir()) + harness.begin() + container = harness.charm.unit.get_container("foo") + self.assertEqual(foo_root, harness.get_filesystem_root(container)) + class TestNetwork(unittest.TestCase): def setUp(self): @@ -4135,211 +4145,6 @@ def test_remove_path(self): # nuance. -class GenericTestingFilesystemTests: - def test_listdir_root_on_empty_os(self): - self.assertEqual(self.fs.list_dir('/'), []) - - def test_listdir_on_nonexistent_dir(self): - with self.assertRaises(FileNotFoundError) as cm: - self.fs.list_dir('/etc') - self.assertTrue('/etc' in cm.exception.args[0]) - - def test_listdir(self): - self.fs.create_dir('/opt') - self.fs.create_file('/opt/file1', 'data') - self.fs.create_file('/opt/file2', 'data') - expected_results = { - pathlib.PurePosixPath('/opt/file1'), - pathlib.PurePosixPath('/opt/file2')} - self.assertEqual(expected_results, {f.path for f in self.fs.list_dir('/opt')}) - # Ensure that Paths also work for listdir - self.assertEqual( - expected_results, {f.path for f in self.fs.list_dir(pathlib.PurePosixPath('/opt'))}) - - def test_listdir_on_file(self): - self.fs.create_file('/file', 'data') - with self.assertRaises(NotADirectoryError) as cm: - self.fs.list_dir('/file') - self.assertTrue('/file' in cm.exception.args[0]) - - def test_makedir(self): - d = self.fs.create_dir('/etc') - self.assertEqual(d.name, 'etc') - self.assertEqual(d.path, pathlib.PurePosixPath('/etc')) - d2 = self.fs.create_dir('/etc/init.d') - self.assertEqual(d2.name, 'init.d') - self.assertEqual(d2.path, pathlib.PurePosixPath('/etc/init.d')) - - def test_makedir_fails_if_already_exists(self): - self.fs.create_dir('/etc') - with self.assertRaises(FileExistsError) as cm: - self.fs.create_dir('/etc') - self.assertTrue('/etc' in cm.exception.args[0]) - - def test_makedir_succeeds_if_already_exists_when_make_parents_true(self): - d1 = self.fs.create_dir('/etc') - d2 = self.fs.create_dir('/etc', make_parents=True) - self.assertEqual(d1.path, d2.path) - self.assertEqual(d1.name, d2.name) - - def test_makedir_fails_if_parent_dir_doesnt_exist(self): - with self.assertRaises(FileNotFoundError) as cm: - self.fs.create_dir('/etc/init.d') - self.assertTrue('/etc' in cm.exception.args[0]) - - def test_make_and_list_directory(self): - self.fs.create_dir('/etc') - self.fs.create_dir('/var') - self.assertEqual( - {f.path for f in self.fs.list_dir('/')}, - {pathlib.PurePosixPath('/etc'), pathlib.PurePosixPath('/var')}) - - def test_make_directory_recursively(self): - self.fs.create_dir('/etc/init.d', make_parents=True) - self.assertEqual([str(o.path) for o in self.fs.list_dir('/')], ['/etc']) - self.assertEqual([str(o.path) for o in self.fs.list_dir('/etc')], ['/etc/init.d']) - - def test_makedir_path_must_start_with_slash(self): - with self.assertRaises(NonAbsolutePathError): - self.fs.create_dir("noslash") - - def test_create_file_fails_if_parent_dir_doesnt_exist(self): - with self.assertRaises(FileNotFoundError) as cm: - self.fs.create_file('/etc/passwd', "foo") - self.assertTrue('/etc' in cm.exception.args[0]) - - def test_create_file_succeeds_if_parent_dir_doesnt_exist_when_make_dirs_true(self): - self.fs.create_file('/test/subdir/testfile', "foo", make_dirs=True) - with self.fs.open('/test/subdir/testfile') as infile: - self.assertEqual(infile.read(), 'foo') - - def test_create_file_from_str(self): - self.fs.create_file('/test', "foo") - with self.fs.open('/test') as infile: - self.assertEqual(infile.read(), 'foo') - - def test_create_file_from_bytes(self): - self.fs.create_file('/test', b"foo") - with self.fs.open('/test', encoding=None) as infile: - self.assertEqual(infile.read(), b'foo') - - def test_create_file_from_files(self): - data = "foo" - - sio = StringIO(data) - self.fs.create_file('/test', sio) - with self.fs.open('/test') as infile: - self.assertEqual(infile.read(), 'foo') - - bio = BytesIO(data.encode()) - self.fs.create_file('/test2', bio) - with self.fs.open('/test2') as infile: - self.assertEqual(infile.read(), 'foo') - - def test_create_and_read_with_different_encodings(self): - # write str, read as utf-8 bytes - self.fs.create_file('/test', "foo") - with self.fs.open('/test', encoding=None) as infile: - self.assertEqual(infile.read(), b'foo') - - # write bytes, read as utf-8-decoded str - data = "日本語" # Japanese for "Japanese" - self.fs.create_file('/test2', data.encode('utf-8')) - with self.fs.open('/test2') as infile: # Implicit utf-8 read - self.assertEqual(infile.read(), data) - with self.fs.open('/test2', encoding='utf-8') as infile: # Explicit utf-8 read - self.assertEqual(infile.read(), data) - - def test_open_directory_fails(self): - self.fs.create_dir('/dir1') - with self.assertRaises(IsADirectoryError) as cm: - self.fs.open('/dir1') - self.assertEqual(cm.exception.args[0], '/dir1') - - def test_delete_file(self): - self.fs.create_file('/test', "foo") - self.fs.delete_path('/test') - with self.assertRaises(FileNotFoundError) as cm: - self.fs.get_path('/test') - - # Deleting deleted files should fail as well - with self.assertRaises(FileNotFoundError) as cm: - self.fs.delete_path('/test') - self.assertTrue('/test' in cm.exception.args[0]) - - def test_create_dir_with_extra_args(self): - d = self.fs.create_dir('/dir1') - self.assertEqual(d.kwargs, {}) - - d = self.fs.create_dir( - '/dir2', permissions=0o700, user='ubuntu', user_id=1000, group='www-data', group_id=33) - self.assertEqual(d.kwargs, { - 'permissions': 0o700, - 'user': 'ubuntu', - 'user_id': 1000, - 'group': 'www-data', - 'group_id': 33, - }) - - def test_create_file_with_extra_args(self): - f = self.fs.create_file('/file1', 'data') - self.assertEqual(f.kwargs, {}) - - f = self.fs.create_file( - '/file2', 'data', - permissions=0o754, user='ubuntu', user_id=1000, group='www-data', group_id=33) - self.assertEqual(f.kwargs, { - 'permissions': 0o754, - 'user': 'ubuntu', - 'user_id': 1000, - 'group': 'www-data', - 'group_id': 33, - }) - - def test_getattr(self): - self.fs.create_dir('/etc/init.d', make_parents=True) - - # By path - o = self.fs.get_path(pathlib.PurePosixPath('/etc/init.d')) - self.assertIsInstance(o, _Directory) - self.assertEqual(o.path, pathlib.PurePosixPath('/etc/init.d')) - - # By str - o = self.fs.get_path('/etc/init.d') - self.assertIsInstance(o, _Directory) - self.assertEqual(o.path, pathlib.PurePosixPath('/etc/init.d')) - - def test_getattr_file_not_found(self): - # Arguably this could be a KeyError given the dictionary-style access. - # However, FileNotFoundError seems more appropriate for a filesystem, and it - # gives a closer semantic feeling, in my opinion. - with self.assertRaises(FileNotFoundError) as cm: - self.fs.get_path('/nonexistent_file') - self.assertTrue('/nonexistent_file' in cm.exception.args[0]) - - -class TestTestingFilesystem(GenericTestingFilesystemTests, unittest.TestCase): - def setUp(self): - self.fs = _TestingFilesystem() - - def test_storage_mount(self): - tmpdir = tempfile.TemporaryDirectory() - self.fs.add_mount('foo', '/foo', tmpdir.name) - self.fs.create_file('/foo/bar/baz.txt', 'quux', make_dirs=True) - - tmppath = os.path.join(tmpdir.name, 'bar/baz.txt') - self.assertTrue(os.path.exists(tmppath)) - with open(tmppath) as f: - self.assertEqual(f.read(), 'quux') - - -class TestTestingStorageMount(GenericTestingFilesystemTests, unittest.TestCase): - def setUp(self): - self.tmp = tempfile.TemporaryDirectory() - self.addCleanup(self.tmp.cleanup) - self.fs = _TestingStorageMount('/', pathlib.Path(self.tmp.name)) - - class TestPebbleStorageAPIsUsingMocks( unittest.TestCase, _TestingPebbleClientMixin, @@ -4350,7 +4155,6 @@ def setUp(self): if self.prefix: self.client.make_dir(self.prefix, make_parents=True) - @unittest.skipUnless(is_linux, 'Pebble runs on Linux') def test_container_storage_mounts(self): harness = ops.testing.Harness(ops.CharmBase, meta=''' name: test-app @@ -4420,25 +4224,91 @@ def test_container_storage_mounts(self): harness.remove_storage(store1_id) self.assertFalse(c1.exists(c1_fpath)) + def _select_testing_user_group(self): + user = [u for u in pwd.getpwall() if u.pw_uid != os.getuid()][0] + group = [g for g in grp.getgrall() if g.gr_gid != os.getgid()][0] + return user, group + def test_push_with_ownership(self): - # Note: To simplify implementation, ownership is simply stored as-is with no verification. data = 'data' client = self.client - client.push(f"{self.prefix}/file", data, user_id=1, user='foo', group_id=3, group='bar') - file_ = client.list_files(f"{self.prefix}/file")[0] - self.assertEqual(file_.user_id, 1) - self.assertEqual(file_.user, 'foo') - self.assertEqual(file_.group_id, 3) - self.assertEqual(file_.group, 'bar') + user, group = self._select_testing_user_group() + cases = [ + { + "user_id": user.pw_uid, + "user": None, + "group_id": group.gr_gid, + "group": None + }, + { + "user_id": None, + "user": user.pw_name, + "group_id": None, + "group": group.gr_name + }, + { + "user_id": None, + "user": user.pw_name, + "group_id": group.gr_gid, + "group": None + }, + { + "user_id": user.pw_uid, + "user": None, + "group_id": None, + "group": group.gr_name + }, + { + "user_id": user.pw_uid, + "user": user.pw_name, + "group_id": group.gr_gid, + "group": group.gr_name + } + ] + for idx, case in enumerate(cases): + client.push(f"{self.prefix}/file{idx}", data, **case) + file_ = client.list_files(f"{self.prefix}/file{idx}")[0] + self.assertEqual(file_.path, f"{self.prefix}/file{idx}") def test_make_dir_with_ownership(self): client = self.client - client.make_dir(f"{self.prefix}/dir1", user_id=1, user="foo", group_id=3, group="bar") - dir_ = client.list_files(f"{self.prefix}/dir1", itself=True)[0] - self.assertEqual(dir_.user_id, 1) - self.assertEqual(dir_.user, "foo") - self.assertEqual(dir_.group_id, 3) - self.assertEqual(dir_.group, "bar") + user, group = self._select_testing_user_group() + cases = [ + { + "user_id": user.pw_uid, + "user": None, + "group_id": group.gr_gid, + "group": None + }, + { + "user_id": None, + "user": user.pw_name, + "group_id": None, + "group": group.gr_name + }, + { + "user_id": None, + "user": user.pw_name, + "group_id": group.gr_gid, + "group": None + }, + { + "user_id": user.pw_uid, + "user": None, + "group_id": None, + "group": group.gr_name + }, + { + "user_id": user.pw_uid, + "user": user.pw_name, + "group_id": group.gr_gid, + "group": group.gr_name + } + ] + for idx, case in enumerate(cases): + client.make_dir(f"{self.prefix}/dir{idx}", **case) + dir_ = client.list_files(f"{self.prefix}/dir{idx}", itself=True)[0] + self.assertEqual(dir_.path, f"{self.prefix}/dir{idx}") @unittest.skipUnless(os.getenv('RUN_REAL_PEBBLE_TESTS'), 'RUN_REAL_PEBBLE_TESTS not set') @@ -4463,6 +4333,106 @@ def test_make_dir_with_permission_mask(self): pass +class TestFilesystem(unittest.TestCase, _TestingPebbleClientMixin): + def setUp(self) -> None: + self.harness = ops.testing.Harness(ops.CharmBase, meta=''' + name: test + containers: + test-container: + mounts: + - storage: test-storage + location: /mounts/foo + storage: + test-storage: + type: filesystem + ''') + self.harness.begin() + self.harness.set_can_connect("test-container", True) + self.root = self.harness.get_filesystem_root("test-container") + self.container = self.harness.charm.unit.get_container("test-container") + + def tearDown(self) -> None: + self.harness.cleanup() + + def test_push(self): + self.container.push("/foo", source="foo") + self.assertTrue((self.root / "foo").is_file()) + self.assertEqual((self.root / "foo").read_text(), "foo") + + def test_push_create_parent(self): + self.container.push("/foo/bar", source="bar", make_dirs=True) + self.assertTrue((self.root / "foo").is_dir()) + self.assertEqual((self.root / "foo" / "bar").read_text(), "bar") + + def test_push_path(self): + with tempfile.TemporaryDirectory() as temp: + tempdir = pathlib.Path(temp) + (tempdir / "foo/bar").mkdir(parents=True) + (tempdir / "foo/test").write_text("test") + (tempdir / "foo/bar/foobar").write_text("foobar") + self.container.push_path(tempdir / "foo", "/tmp") + + self.assertTrue((self.root / "tmp").is_dir()) + self.assertTrue((self.root / "tmp/foo").is_dir()) + self.assertTrue((self.root / "tmp/foo/bar").is_dir()) + self.assertEqual((self.root / "tmp/foo/test").read_text(), "test") + self.assertEqual((self.root / "tmp/foo/bar/foobar").read_text(), "foobar") + + def test_make_dir(self): + self.container.make_dir("/tmp") + self.assertTrue((self.root / "tmp").is_dir()) + self.container.make_dir("/foo/bar/foobar", make_parents=True) + self.assertTrue((self.root / "foo/bar/foobar").is_dir()) + + def test_pull(self): + (self.root / "foo").write_text("foo") + self.assertEqual(self.container.pull("/foo").read(), "foo") + + def test_pull_path(self): + (self.root / "foo").mkdir() + (self.root / "foo/bar").write_text("bar") + # TODO: pull_path doesn't pull empty directories + # https://github.com/canonical/operator/issues/968 + # (self.root / "foobar").mkdir() + (self.root / "test").write_text("test") + with tempfile.TemporaryDirectory() as temp: + tempdir = pathlib.Path(temp) + self.container.pull_path("/", tempdir) + self.assertTrue((tempdir / "foo").is_dir()) + self.assertEqual((tempdir / "foo/bar").read_text(), "bar") + # self.assertTrue((tempdir / "foobar").is_dir()) + self.assertEqual((tempdir / "test").read_text(), "test") + + def test_list_files(self): + (self.root / "foo").mkdir() + self.assertSequenceEqual(self.container.list_files("/foo"), []) + self.assertEqual(len(self.container.list_files("/")), 1) + file_info = self.container.list_files("/")[0] + self.assertEqual(file_info.path, "/foo") + self.assertEqual(file_info.type, FileType.DIRECTORY) + self.assertEqual(self.container.list_files("/foo", itself=True)[0].path, "/foo") + (self.root / "foo/bar").write_text("foobar") + self.assertEqual(len(self.container.list_files("/foo")), 1) + self.assertEqual(len(self.container.list_files("/foo", pattern="*ar")), 1) + self.assertEqual(len(self.container.list_files("/foo", pattern="*oo")), 0) + file_info = self.container.list_files("/foo")[0] + self.assertEqual(file_info.path, "/foo/bar") + self.assertEqual(file_info.type, FileType.FILE) + root_info = self.container.list_files("/", itself=True)[0] + self.assertEqual(root_info.path, "/") + self.assertEqual(root_info.name, "/") + + def test_storage_mount(self): + storage_id = self.harness.add_storage("test-storage", 1, attach=True)[0] + self.assertTrue((self.root / "mounts/foo").exists()) + (self.root / "mounts/foo/bar").write_text("foobar") + self.assertEqual(self.container.pull("/mounts/foo/bar").read(), "foobar") + self.harness.detach_storage(storage_id) + self.assertFalse((self.root / "mounts/foo/bar").is_file()) + self.harness.attach_storage(storage_id) + self.assertTrue((self.root / "mounts/foo/bar").read_text(), "foobar") + + class TestSecrets(unittest.TestCase): def test_add_model_secret_by_app_name_str(self): harness = ops.testing.Harness(ops.CharmBase, meta='name: webapp')