Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SFTPFileTransfer #136

Merged
merged 21 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make SFTP file transfer pass the tests
  • Loading branch information
jl-wynen committed Aug 25, 2023
commit ac895f341ec33d9198c6be0282662fc392adabbb
100 changes: 48 additions & 52 deletions src/scitacean/transfer/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# Note that invoke and paramiko are dependencies of fabric.
from fabric import Connection
from invoke.exceptions import UnexpectedExit
from paramiko import SFTPClient
from paramiko import SFTPAttributes, SFTPClient
from paramiko.ssh_exception import AuthenticationException, PasswordRequiredException

from ..dataset import Dataset
Expand All @@ -21,9 +21,6 @@
from ..logging import get_logger
from .util import source_folder_for

# TODO pass pid in put/revert?
# downloading does not need a pid, so it should not be required in the constructor/


class SFTPDownloadConnection:
def __init__(self, *, connection: Connection) -> None:
Expand Down Expand Up @@ -62,9 +59,7 @@ def remote_path(self, filename: Union[str, RemotePath]) -> RemotePath:

def _make_source_folder(self) -> None:
try:
self._connection.run(
f"mkdir -p {self.source_folder.posix}", hide=True, in_stream=False
)
_mkdir_remote(self._sftp, self.source_folder)
except OSError as exc:
raise FileUploadError(
f"Failed to create source folder {self.source_folder}: {exc.args}"
Expand Down Expand Up @@ -127,32 +122,18 @@ def _validate_upload(self, file: File) -> Optional[Exception]:
return None

def _compute_checksum(self, file: File) -> Optional[str]:
if (hash_exe := _coreutils_checksum_for(file)) is None:
if file.checksum_algorithm is None:
return None
try:
res = self._connection.run(
f"{hash_exe} {self.remote_path(file.remote_path).posix}",
hide=True,
in_stream=False,
)
except UnexpectedExit as exc:
if exc.result.return_code == 127:
get_logger().warning(
"Cannot validate checksum of uploaded file %s because checksum "
"algorithm '%s' is not implemented on the server.",
file.remote_path,
file.checksum_algorithm,
)
return None
raise
return res.stdout.split(" ", 1)[0] # type: ignore[no-any-return]
return _compute_remote_checksum(
self._sftp, self.remote_path(file.remote_path), file.checksum_algorithm
)

def revert_upload(self, *files: File) -> None:
"""Remove uploaded files from the remote folder."""
for file in files:
self._revert_upload_single(remote=file.remote_path, local=file.local_path)

if _folder_is_empty(self._connection, self.source_folder):
if _remote_folder_is_empty(self._sftp, self.source_folder):
try:
get_logger().info(
"Removing empty remote directory %s on host %s",
Expand Down Expand Up @@ -424,31 +405,46 @@ def _connect(
raise type(exc)(exc.args) from None


def _folder_is_empty(con: Connection, path: RemotePath) -> bool:
try:
ls: str = con.run(f"ls {path.posix}", hide=True, in_stream=False).stdout
return ls == ""
except UnexpectedExit:
return False # no further processing is needed in this case


def _coreutils_checksum_for(file: File) -> Optional[str]:
# blake2s is not supported because `b2sum -l 256` produces a different digest
# and I don't know why.
supported = {
"md5": "md5sum -b",
"sha256": "sha256sum -b",
"sha384": "sha384sum -b",
"sha512": "sha512sum -b",
"blake2b": "b2sum -l 512 -b",
}
algorithm = file.checksum_algorithm
if algorithm == "blake2s" or algorithm not in supported:
get_logger().warning(
"Cannot validate checksum of uploaded file %s because checksum algorithm "
"'%s' is not supported by scitacean for remote files.",
file.remote_path,
file.checksum_algorithm,
def _remote_folder_is_empty(sftp: SFTPClient, path: RemotePath) -> bool:
return not sftp.listdir(path.posix)


def _mkdir_remote(sftp: SFTPClient, path: RemotePath) -> None:
if (parent := path.parent) not in (".", "/"):
_mkdir_remote(sftp, parent)

st_stat = _try_remote_stat(sftp, path)
if st_stat is None:
sftp.mkdir(path.posix)
elif not _is_remote_dir(st_stat):
raise FileExistsError(
f"Cannot make directory because path points to a file: {path}"
)


def _try_remote_stat(sftp: SFTPClient, path: RemotePath) -> Optional[SFTPAttributes]:
try:
return sftp.stat(path.posix)
except FileNotFoundError:
return None
return supported[algorithm]


def _is_remote_dir(st_stat: SFTPAttributes) -> bool:
try:
return st_stat.st_mode & 0o040000 == 0o040000
except FileNotFoundError:
return False


def _compute_remote_checksum(
sftp: SFTPClient, path: RemotePath, checksum_algorithm: str
) -> Optional[str]:
with sftp.open(path.posix, "r") as f:
try:
return f.check(checksum_algorithm).decode("utf-8")
except OSError as exc:
# Many servers don't support this.
# See https://docs.paramiko.org/en/latest/api/sftp.html
if "Operation unsupported" in exc.args:
return None
raise
Loading