Skip to content

Commit

Permalink
Add dependency on types-pexpect (mypy).
Browse files Browse the repository at this point in the history
  • Loading branch information
rocodes committed Feb 21, 2024
1 parent 4c1960b commit b4c00e5
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 53 deletions.
11 changes: 11 additions & 0 deletions export/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions export/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pytest = "^7.4.0"
pytest-cov = "^4.1.0"
pytest-mock = "^3.11.1"
semgrep = "^1.31.2"
types-pexpect = "^4.9.0.20240207"

[tool.mypy]
python_version = "3.9"
114 changes: 75 additions & 39 deletions export/securedrop_export/disk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import logging
import os
import pexpect
import re
import subprocess
import time

from re import Pattern
from typing import Optional, Union

from securedrop_export.exceptions import ExportException
Expand All @@ -22,6 +22,26 @@
"--------------------------------------------------------------------------\n"
)

# pexpect allows for a complex type to be passed to `expect` in order to match with input
# that includes regular expressions, byte or string patterns, *or* pexpect.EOF and pexpect.TIMEOUT,
# but mypy needs a little help with it, so the below alias is used as a typehint.
# See https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect
PexpectList = Union[
Pattern[str],
Pattern[bytes],
str,
bytes,
type[pexpect.EOF],
type[pexpect.TIMEOUT],
list[
Union[
Pattern[str],
Pattern[bytes],
Union[str, bytes, Union[type[pexpect.EOF], type[pexpect.TIMEOUT]]],
]
],
]


class CLI:
"""
Expand Down Expand Up @@ -83,7 +103,9 @@ def get_volume(self) -> Union[Volume, MountedVolume]:
logger.error("Unrecoverable: could not parse lsblk.")
raise ExportException(sdstatus=Status.DEVICE_ERROR)

volumes = []
# mypy complains that this is a list[str], but it is a
# list[Union[Volume, MountedVolume]]
volumes = [] # type: ignore
for device in lsblk_json.get("blockdevices"):
if device.get("name") in targets and device.get("ro") is False:
logger.debug(
Expand All @@ -94,21 +116,21 @@ def get_volume(self) -> Union[Volume, MountedVolume]:
if "children" in device:
for partition in device.get("children"):
# /dev/sdX1, /dev/sdX2 etc
item = self._get_supported_volume(partition)
item = self._get_supported_volume(partition) # type: ignore
if item:
volumes.append(item)
volumes.append(item) # type: ignore
# /dev/sdX
else:
item = self._get_supported_volume(device)
item = self._get_supported_volume(device) # type: ignore
if item:
volumes.append(item)
volumes.append(item) # type: ignore

if len(volumes) != 1:
logger.error(f"Need one target, got {len(volumes)}")
raise ExportException(sdstatus=Status.INVALID_DEVICE_DETECTED)
else:
logger.debug(f"Export target is {volumes[0].device_name}")
return volumes[0]
logger.debug(f"Export target is {volumes[0].device_name}") # type: ignore
return volumes[0] # type: ignore

except json.JSONDecodeError as err:
logger.error(err)
Expand Down Expand Up @@ -232,16 +254,24 @@ def unlock_volume(self, volume: Volume, encryption_key: str) -> MountedVolume:
logger.debug("Unlocking volume {}".format(volume.device_name))

command = f"udisksctl unlock --block-device {volume.device_name}"
prompt = ["Passphrase: ", pexpect.EOF, pexpect.TIMEOUT]

# pexpect allows for a match list that contains pexpect.EOF and pexpect.TIMEOUT
# as well as string/regex matches:
# https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect
prompt = [
"Passphrase: ",
pexpect.EOF,
pexpect.TIMEOUT,
] # type: PexpectList
expected = [
f"Unlocked {volume.device_name} as (.*)\.",
f"Unlocked {volume.device_name} as (.*)[^\r\n].",
"GDBus.Error:org.freedesktop.UDisks2.Error.Failed: Device " # string continues
f"{volume.device_name} is already unlocked as (.*)\.",
f"{volume.device_name} is already unlocked as (.*)[^\r\n].",
"GDBus.Error:org.freedesktop.UDisks2.Error.Failed: Error " # string continues
f"unlocking {volume.device_name}: Failed to activate device: Incorrect passphrase",
pexpect.EOF,
pexpect.TIMEOUT,
]
] # type: PexpectList
unlock_error = Status.ERROR_UNLOCK_GENERIC

child = pexpect.spawn(command)
Expand All @@ -254,8 +284,10 @@ def unlock_volume(self, volume: Volume, encryption_key: str) -> MountedVolume:
child.sendline(encryption_key)
index = child.expect(expected)
if index == 0 or index == 1:
# We know what format the string is in
dm_name = child.match.group(1).decode("utf-8").strip()
# We know what format the string is in.
# Pexpect includes a re.Match object at `child.match`, but this freaks mypy out:
# see https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect
dm_name = child.match.group(1).decode("utf-8").strip() # type: ignore
logger.debug(f"Device is unlocked as {dm_name}")

child.close()
Expand Down Expand Up @@ -294,64 +326,68 @@ def _mount_volume(self, volume: Volume, full_unlocked_name: str) -> MountedVolum
info = f"udisksctl info --block-device {volume.device_name}"
# \x1b[37mPreferredDevice:\x1b[0m /dev/sdaX\r\n
expected_info = [
f"*PreferredDevice:[\t+]{volume.device_name}\r\n",
"*Error looking up object for device*",
f"PreferredDevice:[\t+]{volume.device_name}",
"Error looking up object for device",
pexpect.EOF,
pexpect.TIMEOUT,
]
] # type: PexpectList
max_retries = 3

unlock = f"udisksctl mount --block-device {full_unlocked_name}"
mount = f"udisksctl mount --block-device {full_unlocked_name}"

# We can't pass {full_unlocked_name} in the match statement since even if we
# pass in /dev/mapper/xxx, udisks2 may refer to the disk as /dev/dm-X.
expected_unlock = [
f"Mounted * at (.*)",
f"Error mounting *: GDBus.Error:org." # string continues
"freedesktop.UDisks2.Error.AlreadyMounted: " # string continues
"Device .* is already mounted at `(.*)'",
f"Error looking up object for device *.",
expected_mount = [
"Mounted .* at (.*)",
"Error mounting .*: GDBus.Error:org.freedesktop.UDisks2.Error.AlreadyMounted: "
"Device (.*) is already mounted at `(.*)'.",
"Error looking up object for device",
pexpect.EOF,
pexpect.TIMEOUT,
]
] # type: PexpectList
mountpoint = None

logger.debug(f"Check to make sure udisks identified {volume.device_name} "
"(unlocked as {full_unlocked_name})")
logger.debug(
f"Check to make sure udisks identified {volume.device_name} "
f"(unlocked as {full_unlocked_name})"
)
for _ in range(max_retries):
child = pexpect.spawn(info)
index = child.expect(expected_info)
logger.debug(f"Results from udisks info: {volume.device_name}, "
"before: {child.before}, after: {child.after}")
logger.debug(
f"Results from udisks info: {volume.device_name}, "
f"before: {child.before}, after: {child.after}"
)
child.close()

if index != 0:
logger.debug(f"index {index}")
logger.warning(
logger.debug(
f"udisks can't identify {volume.device_name}, retrying..."
)
time.sleep(0.5)
else:
print(f"udisks found {volume.device_name}")
logger.debug(f"udisks found {volume.device_name}")
break

logger.info(f"Mount {full_unlocked_name} using udisksctl")
child = pexpect.spawn(unlock)
index = child.expect(expected_unlock)
child = pexpect.spawn(mount)
index = child.expect(expected_mount)

logger.debug(
f"child: {str(child.match)}, before: {child.before}, after: {child.after}"
)

if index == 0:
# As above, we know the format
mountpoint = child.match.group(1).decode("utf-8").strip()
# As above, we know the format.
# Per https://pexpect.readthedocs.io/en/stable/api/pexpect.html#pexpect.spawn.expect,
# `child.match` is a re.Match object
mountpoint = child.match.group(1).decode("utf-8").strip() # type: ignore
logger.debug(f"Successfully mounted device at {mountpoint}")

elif index == 1:
# Mountpoint needs a bit of help. It arrives in the form `/path/to/mountpoint'.
# including the one backtick, single quote, and the period
mountpoint = child.match.group(1).decode("utf-8").strip()
# Use udisks unlocked name
full_unlocked_name = child.match.group(1).decode("utf-8").strip() # type: ignore
mountpoint = child.match.group(2).decode("utf-8").strip() # type: ignore
logger.debug(f"Device already mounted at {mountpoint}")

elif index == 2:
Expand Down
1 change: 0 additions & 1 deletion export/securedrop_export/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import platform
import logging
import sys
from typing import Optional

from securedrop_export.archive import Archive, Metadata
from securedrop_export.command import Command
Expand Down
17 changes: 7 additions & 10 deletions export/tests/disk/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import pytest
from pexpect import ExceptionPexpect
from unittest import mock

import subprocess
import pexpect
import re

from securedrop_export.disk.cli import CLI
Expand Down Expand Up @@ -410,7 +408,6 @@ def test_cleanup_error_reports_exporterror_if_flagged(self, mock_popen):
self.cli.cleanup(mock_volume, submission.tmpdir, is_error=True)
assert ex.value.sdstatus is Status.ERROR_EXPORT


@mock.patch("os.path.exists", return_value=False)
@mock.patch("subprocess.check_call", return_value=0)
def test_cleanup(self, mock_subprocess, mocked_path):
Expand Down Expand Up @@ -444,14 +441,14 @@ def test_cleanup(self, mock_subprocess, mocked_path):
def test_parse_correct_mountpoint_from_pexpect(self, mock_pexpect):
child = mock_pexpect()
child.expect.return_value = 1
child.match.return_value = re.match(
r"`(\w+)'\.\r\n".encode("utf-8"),
"Error mounting /dev/dm-1: GDBus.Error:org."
"freedesktop.UDisks2.Error.AlreadyMounted: "
"Device /dev/sda1 is already mounted at `/dev/dm-0'.\r\n".encode("utf-8"),
)
child.match = mock.MagicMock()
child.match.group.side_effect = [
"/dev/dm-0".encode("utf-8"),
"/media/usb".encode("utf-8"),
]

mv = self.cli._mount_volume(
Volume("/dev/sda1", EncryptionScheme.VERACRYPT), "/dev/dm-1"
Volume("/dev/sda1", EncryptionScheme.VERACRYPT), "/dev/mapper/vc"
)
assert mv.unlocked_name == "/dev/dm-0"
assert mv.mountpoint == "/media/usb"
4 changes: 1 addition & 3 deletions export/tests/disk/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ def _setup_submission(cls) -> Archive:
temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, Metadata.METADATA_FILE)
with open(metadata, "w") as f:
f.write(
'{"device": "disk", "encryption_key": "hunter1"}'
)
f.write('{"device": "disk", "encryption_key": "hunter1"}')

return submission.set_metadata(Metadata(temp_folder).validate())

Expand Down

0 comments on commit b4c00e5

Please sign in to comment.