Skip to content

Commit

Permalink
asynchrounous counterparts are added to transport.py
Browse files Browse the repository at this point in the history
  • Loading branch information
khsrali committed Nov 18, 2024
1 parent 6811098 commit 5fdde51
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 107 deletions.
58 changes: 31 additions & 27 deletions src/aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async def upload_calculation(
if dry_run:
workdir = Path(folder.abspath)
else:
remote_user = transport.whoami()
remote_user = await transport.whoami_async()
remote_working_directory = computer.get_workdir().format(username=remote_user)
if not remote_working_directory.strip():
raise exceptions.ConfigurationError(
Expand All @@ -114,13 +114,13 @@ async def upload_calculation(
)

# If it already exists, no exception is raised
if not transport.path_exists(remote_working_directory):
if not await transport.path_exists_async(remote_working_directory):
logger.debug(
f'[submission of calculation {node.pk}] Path '
f'{remote_working_directory} does not exist, trying to create it'
)
try:
transport.makedirs(remote_working_directory)
await transport.makedirs_async(remote_working_directory)
except EnvironmentError as exc:
raise exceptions.ConfigurationError(
f'[submission of calculation {node.pk}] '
Expand All @@ -133,14 +133,14 @@ async def upload_calculation(
# and I do not have to know the logic, but I just need to
# read the absolute path from the calculation properties.
workdir = Path(remote_working_directory).joinpath(calc_info.uuid[:2], calc_info.uuid[2:4])
transport.makedirs(str(workdir), ignore_existing=True)
await transport.makedirs_async(str(workdir), ignore_existing=True)

try:
# The final directory may already exist, most likely because this function was already executed once, but
# failed and as a result was rescheduled by the engine. In this case it would be fine to delete the folder
# and create it from scratch, except that we cannot be sure that this the actual case. Therefore, to err on
# the safe side, we move the folder to the lost+found directory before recreating the folder from scratch
transport.mkdir(str(workdir.joinpath(calc_info.uuid[4:])))
await transport.mkdir_async(str(workdir.joinpath(calc_info.uuid[4:])))
except OSError:
# Move the existing directory to lost+found, log a warning and create a clean directory anyway
path_existing = os.path.join(str(workdir), calc_info.uuid[4:])
Expand All @@ -151,12 +151,12 @@ async def upload_calculation(
)

# Make sure the lost+found directory exists, then copy the existing folder there and delete the original
transport.mkdir(path_lost_found, ignore_existing=True)
transport.copytree(path_existing, path_target)
transport.rmtree(path_existing)
await transport.mkdir_async(path_lost_found, ignore_existing=True)
await transport.copytree_async(path_existing, path_target)
await transport.rmtree_async(path_existing)

# Now we can create a clean folder for this calculation
transport.mkdir(str(workdir.joinpath(calc_info.uuid[4:])))
await transport.mkdir_async(str(workdir.joinpath(calc_info.uuid[4:])))
finally:
workdir = workdir.joinpath(calc_info.uuid[4:])

Expand All @@ -171,11 +171,11 @@ async def upload_calculation(
# Note: this will possibly overwrite files
for root, dirnames, filenames in code.base.repository.walk():
# mkdir of root
transport.makedirs(str(workdir.joinpath(root)), ignore_existing=True)
await transport.makedirs_async(str(workdir.joinpath(root)), ignore_existing=True)

# remotely mkdir first
for dirname in dirnames:
transport.makedirs(str(workdir.joinpath(root, dirname)), ignore_existing=True)
await transport.makedirs_async(str(workdir.joinpath(root, dirname)), ignore_existing=True)

# Note, once #2579 is implemented, use the `node.open` method instead of the named temporary file in
# combination with the new `Transport.put_object_from_filelike`
Expand All @@ -187,9 +187,9 @@ async def upload_calculation(
handle.flush()
await transport.put_async(handle.name, str(workdir.joinpath(root, filename)))
if code.filepath_executable.is_absolute():
transport.chmod(str(code.filepath_executable), 0o755) # rwxr-xr-x
await transport.chmod_async(str(code.filepath_executable), 0o755) # rwxr-xr-x
else:
transport.chmod(str(workdir.joinpath(code.filepath_executable)), 0o755) # rwxr-xr-x
await transport.chmod_async(str(workdir.joinpath(code.filepath_executable)), 0o755) # rwxr-xr-x

# local_copy_list is a list of tuples, each with (uuid, dest_path, rel_path)
# NOTE: validation of these lists are done inside calculation.presubmit()
Expand All @@ -209,7 +209,7 @@ async def upload_calculation(
await _copy_local_files(logger, node, transport, inputs, local_copy_list, workdir=workdir)
elif file_copy_operation is FileCopyOperation.REMOTE:
if not dry_run:
_copy_remote_files(
await _copy_remote_files(
logger, node, computer, transport, remote_copy_list, remote_symlink_list, workdir=workdir
)
elif file_copy_operation is FileCopyOperation.SANDBOX:
Expand Down Expand Up @@ -279,7 +279,7 @@ async def upload_calculation(
return None


def _copy_remote_files(logger, node, computer, transport, remote_copy_list, remote_symlink_list, workdir: Path):
async def _copy_remote_files(logger, node, computer, transport, remote_copy_list, remote_symlink_list, workdir: Path):
"""Perform the copy instructions of the ``remote_copy_list`` and ``remote_symlink_list``."""
for remote_computer_uuid, remote_abs_path, dest_rel_path in remote_copy_list:
if remote_computer_uuid == computer.uuid:
Expand All @@ -288,7 +288,7 @@ def _copy_remote_files(logger, node, computer, transport, remote_copy_list, remo
f'remotely, directly on the machine {computer.label}'
)
try:
transport.copy(remote_abs_path, str(workdir.joinpath(dest_rel_path)))
await transport.copy_async(remote_abs_path, str(workdir.joinpath(dest_rel_path)))
except FileNotFoundError:
logger.warning(
f'[submission of calculation {node.pk}] Unable to copy remote '
Expand All @@ -314,8 +314,8 @@ def _copy_remote_files(logger, node, computer, transport, remote_copy_list, remo
)
remote_dirname = Path(dest_rel_path).parent
try:
transport.makedirs(str(workdir.joinpath(remote_dirname)), ignore_existing=True)
transport.symlink(remote_abs_path, str(workdir.joinpath(dest_rel_path)))
await transport.makedirs_async(str(workdir.joinpath(remote_dirname)), ignore_existing=True)
await transport.symlink_async(remote_abs_path, str(workdir.joinpath(dest_rel_path)))
except OSError:
logger.warning(
f'[submission of calculation {node.pk}] Unable to create remote symlink '
Expand Down Expand Up @@ -356,14 +356,18 @@ async def _copy_local_files(logger, node, transport, inputs, local_copy_list, wo
# The logic below takes care of an edge case where the source is a file but the target is a directory. In
# this case, the v2.5.1 implementation would raise an `IsADirectoryError` exception, because it would try
# to open the directory in the sandbox folder as a file when writing the contents.
if file_type_source == FileType.FILE and target and transport.isdir(str(workdir.joinpath(target))):
if (
file_type_source == FileType.FILE
and target
and await transport.isdir_async(str(workdir.joinpath(target)))
):
raise IsADirectoryError

# In case the source filename is specified and it is a directory that already exists in the remote, we
# want to avoid nested directories in the target path to replicate the behavior of v2.5.1. This is done by
# setting the target filename to '.', which means the contents of the node will be copied in the top level
# of the temporary directory, whose contents are then copied into the target directory.
if filename and transport.isdir(str(workdir.joinpath(filename))):
if filename and await transport.isdir_async(str(workdir.joinpath(filename))):
filename_target = '.'

filepath_target = (dirpath / filename_target).resolve().absolute()
Expand All @@ -382,7 +386,7 @@ async def _copy_local_files(logger, node, transport, inputs, local_copy_list, wo
with filepath_target.open('wb') as handle:
with data_node.base.repository.open(filename_source, 'rb') as source:
shutil.copyfileobj(source, handle)
transport.makedirs(str(workdir.joinpath(Path(target).parent)), ignore_existing=True)
await transport.makedirs_async(str(workdir.joinpath(Path(target).parent)), ignore_existing=True)
await transport.put_async(str(filepath_target), str(workdir.joinpath(target)))


Expand Down Expand Up @@ -423,7 +427,7 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str |
return result


def stash_calculation(calculation: CalcJobNode, transport: Transport) -> None:
async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> None:
"""Stash files from the working directory of a completed calculation to a permanent remote folder.
After a calculation has been completed, optionally stash files from the work directory to a storage location on the
Expand Down Expand Up @@ -461,7 +465,7 @@ def stash_calculation(calculation: CalcJobNode, transport: Transport) -> None:
for source_filename in source_list:
if transport.has_magic(source_filename):
copy_instructions = []
for globbed_filename in transport.glob(str(source_basepath / source_filename)):
for globbed_filename in await transport.glob_async(str(source_basepath / source_filename)):
target_filepath = target_basepath / Path(globbed_filename).relative_to(source_basepath)
copy_instructions.append((globbed_filename, target_filepath))
else:
Expand All @@ -470,10 +474,10 @@ def stash_calculation(calculation: CalcJobNode, transport: Transport) -> None:
for source_filepath, target_filepath in copy_instructions:
# If the source file is in a (nested) directory, create those directories first in the target directory
target_dirname = target_filepath.parent
transport.makedirs(str(target_dirname), ignore_existing=True)
await transport.makedirs_async(str(target_dirname), ignore_existing=True)

try:
transport.copy(str(source_filepath), str(target_filepath))
await transport.copy_async(str(source_filepath), str(target_filepath))
except (OSError, ValueError) as exception:
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
else:
Expand Down Expand Up @@ -621,7 +625,7 @@ async def retrieve_files_from_list(
tmp_rname, tmp_lname, depth = item
# if there are more than one file I do something differently
if transport.has_magic(tmp_rname):
remote_names = transport.glob(str(workdir.joinpath(tmp_rname)))
remote_names = await transport.glob_async(str(workdir.joinpath(tmp_rname)))
local_names = []
for rem in remote_names:
# get the relative path so to make local_names relative
Expand All @@ -644,7 +648,7 @@ async def retrieve_files_from_list(
abs_item = item if item.startswith('/') else str(workdir.joinpath(item))

if transport.has_magic(abs_item):
remote_names = transport.glob(abs_item)
remote_names = await transport.glob_async(abs_item)
local_names = [os.path.split(rem)[1] for rem in remote_names]
else:
remote_names = [abs_item]
Expand Down
2 changes: 1 addition & 1 deletion src/aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ async def do_stash():
transport = await cancellable.with_interrupt(request)

logger.info(f'stashing calculation<{node.pk}>')
return execmanager.stash_calculation(node, transport)
return await execmanager.stash_calculation(node, transport)

try:
await exponential_backoff_retry(
Expand Down
16 changes: 7 additions & 9 deletions src/aiida/transports/plugins/ssh_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ async def isfile_async(self, path):

return await self._sftp.isfile(path)

async def listdir_async(self, path, pattern=None):
async def listdir_async(self, path, pattern=None): # type: ignore[override]
"""
:param path: the absolute path to list
Expand All @@ -579,7 +579,7 @@ async def listdir_async(self, path, pattern=None):

return list_

async def listdir_withattributes_async(self, path: _TransportPath, pattern: Optional[str] = None):
async def listdir_withattributes_async(self, path: _TransportPath, pattern: Optional[str] = None): # type: ignore[override]
"""Return a list of the names of the entries in the given path.
The list is in arbitrary order. It does not include the special
entries '.' and '..' even if they are present in the directory.
Expand Down Expand Up @@ -791,13 +791,11 @@ async def chmod_async(self, path, mode, follow_symlinks=True):
except asyncssh.sftp.SFTPNoSuchFile as exc:
raise OSError(f'Error {exc}, directory does not exists')

# ## Blocking methods. We need these for backwards compatibility
# def run_command_blocking(self, func, *args, **kwargs):
# """Call an async method blocking.
# This is useful, only because in some part of engine and
# many external plugins are synchronous function calls make more sense.
# However, be aware these synchronous calls probably won't be performant."""
# return asyncio.run(func(*args, **kwargs))
## Blocking methods. We need these for backwards compatibility
# This is useful, only because some part of engine and
# many external plugins are synchronous, in those cases blocking calls make more sense.
# However, be aware you cannot use these methods in an async functions,
# because they will block the event loop.

def run_command_blocking(self, func, *args, **kwargs):
loop = asyncio.get_event_loop()
Expand Down
Loading

0 comments on commit 5fdde51

Please sign in to comment.