From b4c00e58cbd75ccc493c0b39d15d57e890a19eb3 Mon Sep 17 00:00:00 2001 From: Ro Date: Mon, 12 Feb 2024 17:08:01 -0500 Subject: [PATCH] Add dependency on types-pexpect (mypy). --- export/poetry.lock | 11 +++ export/pyproject.toml | 1 + export/securedrop_export/disk/cli.py | 114 ++++++++++++++++++--------- export/securedrop_export/main.py | 1 - export/tests/disk/test_cli.py | 17 ++-- export/tests/disk/test_service.py | 4 +- 6 files changed, 95 insertions(+), 53 deletions(-) diff --git a/export/poetry.lock b/export/poetry.lock index 2358699fa1..6b1f3115e1 100644 --- a/export/poetry.lock +++ b/export/poetry.lock @@ -1003,6 +1003,17 @@ files = [ {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, ] +[[package]] +name = "types-pexpect" +version = "4.9.0.20240207" +description = "Typing stubs for pexpect" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-pexpect-4.9.0.20240207.tar.gz", hash = "sha256:910e20f0f177aeee5f2808d1b3221e3a23dfa1ca3bb02f685c2788fce6ddeb73"}, + {file = "types_pexpect-4.9.0.20240207-py3-none-any.whl", hash = "sha256:22b3fdccf253a8627bac0d3169845743fe0b1dbc87e5d33a438faaf879eb1f7a"}, +] + [[package]] name = "types-setuptools" version = "68.2.0.0" diff --git a/export/pyproject.toml b/export/pyproject.toml index be8b8d9766..6d185858e8 100644 --- a/export/pyproject.toml +++ b/export/pyproject.toml @@ -19,6 +19,7 @@ pytest = "^7.4.0" pytest-cov = "^4.1.0" pytest-mock = "^3.11.1" semgrep = "^1.31.2" +types-pexpect = "^4.9.0.20240207" [tool.mypy] python_version = "3.9" diff --git a/export/securedrop_export/disk/cli.py b/export/securedrop_export/disk/cli.py index 1148e17b6a..9a4f54231e 100644 --- a/export/securedrop_export/disk/cli.py +++ b/export/securedrop_export/disk/cli.py @@ -2,10 +2,10 @@ import logging import os import pexpect -import re import subprocess import time +from re import Pattern from typing import Optional, Union from securedrop_export.exceptions import ExportException @@ -22,6 +22,26 @@ "--------------------------------------------------------------------------\n" ) +# pexpect allows for a complex type to be passed to `expect` in order to match with input +# that includes regular expressions, byte or string patterns, *or* pexpect.EOF and pexpect.TIMEOUT, +# but mypy needs a little help with it, so the below alias is used as a typehint. +# See https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect +PexpectList = Union[ + Pattern[str], + Pattern[bytes], + str, + bytes, + type[pexpect.EOF], + type[pexpect.TIMEOUT], + list[ + Union[ + Pattern[str], + Pattern[bytes], + Union[str, bytes, Union[type[pexpect.EOF], type[pexpect.TIMEOUT]]], + ] + ], +] + class CLI: """ @@ -83,7 +103,9 @@ def get_volume(self) -> Union[Volume, MountedVolume]: logger.error("Unrecoverable: could not parse lsblk.") raise ExportException(sdstatus=Status.DEVICE_ERROR) - volumes = [] + # mypy complains that this is a list[str], but it is a + # list[Union[Volume, MountedVolume]] + volumes = [] # type: ignore for device in lsblk_json.get("blockdevices"): if device.get("name") in targets and device.get("ro") is False: logger.debug( @@ -94,21 +116,21 @@ def get_volume(self) -> Union[Volume, MountedVolume]: if "children" in device: for partition in device.get("children"): # /dev/sdX1, /dev/sdX2 etc - item = self._get_supported_volume(partition) + item = self._get_supported_volume(partition) # type: ignore if item: - volumes.append(item) + volumes.append(item) # type: ignore # /dev/sdX else: - item = self._get_supported_volume(device) + item = self._get_supported_volume(device) # type: ignore if item: - volumes.append(item) + volumes.append(item) # type: ignore if len(volumes) != 1: logger.error(f"Need one target, got {len(volumes)}") raise ExportException(sdstatus=Status.INVALID_DEVICE_DETECTED) else: - logger.debug(f"Export target is {volumes[0].device_name}") - return volumes[0] + logger.debug(f"Export target is {volumes[0].device_name}") # type: ignore + return volumes[0] # type: ignore except json.JSONDecodeError as err: logger.error(err) @@ -232,16 +254,24 @@ def unlock_volume(self, volume: Volume, encryption_key: str) -> MountedVolume: logger.debug("Unlocking volume {}".format(volume.device_name)) command = f"udisksctl unlock --block-device {volume.device_name}" - prompt = ["Passphrase: ", pexpect.EOF, pexpect.TIMEOUT] + + # pexpect allows for a match list that contains pexpect.EOF and pexpect.TIMEOUT + # as well as string/regex matches: + # https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect + prompt = [ + "Passphrase: ", + pexpect.EOF, + pexpect.TIMEOUT, + ] # type: PexpectList expected = [ - f"Unlocked {volume.device_name} as (.*)\.", + f"Unlocked {volume.device_name} as (.*)[^\r\n].", "GDBus.Error:org.freedesktop.UDisks2.Error.Failed: Device " # string continues - f"{volume.device_name} is already unlocked as (.*)\.", + f"{volume.device_name} is already unlocked as (.*)[^\r\n].", "GDBus.Error:org.freedesktop.UDisks2.Error.Failed: Error " # string continues f"unlocking {volume.device_name}: Failed to activate device: Incorrect passphrase", pexpect.EOF, pexpect.TIMEOUT, - ] + ] # type: PexpectList unlock_error = Status.ERROR_UNLOCK_GENERIC child = pexpect.spawn(command) @@ -254,8 +284,10 @@ def unlock_volume(self, volume: Volume, encryption_key: str) -> MountedVolume: child.sendline(encryption_key) index = child.expect(expected) if index == 0 or index == 1: - # We know what format the string is in - dm_name = child.match.group(1).decode("utf-8").strip() + # We know what format the string is in. + # Pexpect includes a re.Match object at `child.match`, but this freaks mypy out: + # see https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect + dm_name = child.match.group(1).decode("utf-8").strip() # type: ignore logger.debug(f"Device is unlocked as {dm_name}") child.close() @@ -294,64 +326,68 @@ def _mount_volume(self, volume: Volume, full_unlocked_name: str) -> MountedVolum info = f"udisksctl info --block-device {volume.device_name}" # \x1b[37mPreferredDevice:\x1b[0m /dev/sdaX\r\n expected_info = [ - f"*PreferredDevice:[\t+]{volume.device_name}\r\n", - "*Error looking up object for device*", + f"PreferredDevice:[\t+]{volume.device_name}", + "Error looking up object for device", pexpect.EOF, pexpect.TIMEOUT, - ] + ] # type: PexpectList max_retries = 3 - unlock = f"udisksctl mount --block-device {full_unlocked_name}" + mount = f"udisksctl mount --block-device {full_unlocked_name}" # We can't pass {full_unlocked_name} in the match statement since even if we # pass in /dev/mapper/xxx, udisks2 may refer to the disk as /dev/dm-X. - expected_unlock = [ - f"Mounted * at (.*)", - f"Error mounting *: GDBus.Error:org." # string continues - "freedesktop.UDisks2.Error.AlreadyMounted: " # string continues - "Device .* is already mounted at `(.*)'", - f"Error looking up object for device *.", + expected_mount = [ + "Mounted .* at (.*)", + "Error mounting .*: GDBus.Error:org.freedesktop.UDisks2.Error.AlreadyMounted: " + "Device (.*) is already mounted at `(.*)'.", + "Error looking up object for device", pexpect.EOF, pexpect.TIMEOUT, - ] + ] # type: PexpectList mountpoint = None - logger.debug(f"Check to make sure udisks identified {volume.device_name} " - "(unlocked as {full_unlocked_name})") + logger.debug( + f"Check to make sure udisks identified {volume.device_name} " + f"(unlocked as {full_unlocked_name})" + ) for _ in range(max_retries): child = pexpect.spawn(info) index = child.expect(expected_info) - logger.debug(f"Results from udisks info: {volume.device_name}, " - "before: {child.before}, after: {child.after}") + logger.debug( + f"Results from udisks info: {volume.device_name}, " + f"before: {child.before}, after: {child.after}" + ) child.close() if index != 0: - logger.debug(f"index {index}") - logger.warning( + logger.debug( f"udisks can't identify {volume.device_name}, retrying..." ) time.sleep(0.5) else: - print(f"udisks found {volume.device_name}") + logger.debug(f"udisks found {volume.device_name}") break logger.info(f"Mount {full_unlocked_name} using udisksctl") - child = pexpect.spawn(unlock) - index = child.expect(expected_unlock) + child = pexpect.spawn(mount) + index = child.expect(expected_mount) logger.debug( f"child: {str(child.match)}, before: {child.before}, after: {child.after}" ) if index == 0: - # As above, we know the format - mountpoint = child.match.group(1).decode("utf-8").strip() + # As above, we know the format. + # Per https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect, + # `child.match` is a re.Match object + mountpoint = child.match.group(1).decode("utf-8").strip() # type: ignore logger.debug(f"Successfully mounted device at {mountpoint}") elif index == 1: - # Mountpoint needs a bit of help. It arrives in the form `/path/to/mountpoint'. - # including the one backtick, single quote, and the period - mountpoint = child.match.group(1).decode("utf-8").strip() + # Use udisks unlocked name + full_unlocked_name = child.match.group(1).decode("utf-8").strip() # type: ignore + mountpoint = child.match.group(2).decode("utf-8").strip() # type: ignore logger.debug(f"Device already mounted at {mountpoint}") elif index == 2: diff --git a/export/securedrop_export/main.py b/export/securedrop_export/main.py index 4535a1dc7f..d04787c3b9 100755 --- a/export/securedrop_export/main.py +++ b/export/securedrop_export/main.py @@ -5,7 +5,6 @@ import platform import logging import sys -from typing import Optional from securedrop_export.archive import Archive, Metadata from securedrop_export.command import Command diff --git a/export/tests/disk/test_cli.py b/export/tests/disk/test_cli.py index 2723d5f368..9b6b3cb774 100644 --- a/export/tests/disk/test_cli.py +++ b/export/tests/disk/test_cli.py @@ -1,9 +1,7 @@ import pytest -from pexpect import ExceptionPexpect from unittest import mock import subprocess -import pexpect import re from securedrop_export.disk.cli import CLI @@ -410,7 +408,6 @@ def test_cleanup_error_reports_exporterror_if_flagged(self, mock_popen): self.cli.cleanup(mock_volume, submission.tmpdir, is_error=True) assert ex.value.sdstatus is Status.ERROR_EXPORT - @mock.patch("os.path.exists", return_value=False) @mock.patch("subprocess.check_call", return_value=0) def test_cleanup(self, mock_subprocess, mocked_path): @@ -444,14 +441,14 @@ def test_cleanup(self, mock_subprocess, mocked_path): def test_parse_correct_mountpoint_from_pexpect(self, mock_pexpect): child = mock_pexpect() child.expect.return_value = 1 - child.match.return_value = re.match( - r"`(\w+)'\.\r\n".encode("utf-8"), - "Error mounting /dev/dm-1: GDBus.Error:org." - "freedesktop.UDisks2.Error.AlreadyMounted: " - "Device /dev/sda1 is already mounted at `/dev/dm-0'.\r\n".encode("utf-8"), - ) + child.match = mock.MagicMock() + child.match.group.side_effect = [ + "/dev/dm-0".encode("utf-8"), + "/media/usb".encode("utf-8"), + ] mv = self.cli._mount_volume( - Volume("/dev/sda1", EncryptionScheme.VERACRYPT), "/dev/dm-1" + Volume("/dev/sda1", EncryptionScheme.VERACRYPT), "/dev/mapper/vc" ) assert mv.unlocked_name == "/dev/dm-0" + assert mv.mountpoint == "/media/usb" diff --git a/export/tests/disk/test_service.py b/export/tests/disk/test_service.py index 73dc0210ac..3b52fe9854 100644 --- a/export/tests/disk/test_service.py +++ b/export/tests/disk/test_service.py @@ -59,9 +59,7 @@ def _setup_submission(cls) -> Archive: temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: - f.write( - '{"device": "disk", "encryption_key": "hunter1"}' - ) + f.write('{"device": "disk", "encryption_key": "hunter1"}') return submission.set_metadata(Metadata(temp_folder).validate())