Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport & Engine: AsyncTransport plugin #6626

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b59d999
♻️ Allow for file uploads/downloads to be async
chrisjsewell Jul 9, 2023
0c42841
Update test_execmanager.py
chrisjsewell Jul 9, 2023
6cb4d9e
Merge remote-tracking branch 'upstream/main' into async-run
chrisjsewell Jul 9, 2023
14cbd29
update run methods
chrisjsewell Jul 9, 2023
e9361bb
Merge branch 'main' into async-transport
khsrali Nov 18, 2024
6811098
async transport, the first implementation
khsrali Nov 18, 2024
5fdde51
asynchrounous counterparts are added to transport.py
khsrali Nov 18, 2024
ccc545e
Giovanni's review applied
khsrali Nov 19, 2024
f187fdc
adopted tests
khsrali Nov 19, 2024
565724d
docstring updated
khsrali Nov 20, 2024
6e350e7
added computer test for ssh_async
khsrali Nov 21, 2024
03ccc30
review applied
khsrali Nov 25, 2024
178bf7b
chnage from machine to machine_
khsrali Nov 27, 2024
cc0bc5c
review applied
khsrali Nov 29, 2024
3210c27
copy-remote adopted with behaviour of asyncssh
khsrali Dec 4, 2024
76aaf53
Merge branch 'main' into async-transport
khsrali Dec 4, 2024
65f0663
remove str() use from test_all_plugins
khsrali Dec 4, 2024
a809b98
copy() are now aligned with fresh development on asyncssh
khsrali Dec 5, 2024
38cfc24
fixed some stupid issues
khsrali Dec 5, 2024
799e0f8
plumpy hook pointing to async-run branch, now
khsrali Dec 5, 2024
665a163
Merge branch 'main' into async-transport
khsrali Dec 5, 2024
0837193
updated uv lock
khsrali Dec 5, 2024
1b96110
Fixing uv.lock file for the depedencies from a github repo
agoscinski Dec 5, 2024
3aa0031
Merge branch 'main' into async-transport
khsrali Dec 6, 2024
a68240c
fix conflicts
khsrali Dec 6, 2024
520e58e
fixed afew self blocking calls in copy_async()
khsrali Dec 10, 2024
22eb929
Merge branch 'main' into async-transport
khsrali Dec 11, 2024
90718f4
fix rtd
khsrali Dec 11, 2024
343cf9c
fix uv
khsrali Dec 11, 2024
482eeca
escape for bash on command
khsrali Dec 11, 2024
5e29e5b
fixed many warnings of rtd
khsrali Dec 11, 2024
a5ff84d
Merge branch 'main' into async-transport
khsrali Dec 11, 2024
1761d94
implement max_io_allowed
khsrali Dec 13, 2024
cf01ac0
Merge branch 'main' into async-transport
khsrali Dec 13, 2024
6627b21
update asyncssh dependency
khsrali Dec 13, 2024
2ebf945
plumpy dependency pin the exact commit
khsrali Dec 16, 2024
fb384ac
Merge branch 'main' into async-transport
khsrali Dec 20, 2024
d6176fd
plumpy -> 0.23.0
khsrali Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .readthedocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ build:
- asdf install uv 0.2.9
- asdf global uv 0.2.9
post_install:
- VIRTUAL_ENV=$READTHEDOCS_VIRTUALENV_PATH uv pip install .[docs,tests,rest,atomic_tools]
- VIRTUAL_ENV=$READTHEDOCS_VIRTUALENV_PATH uv pip install .[docs,tests,rest,atomic_tools] --preview

# Let the build fail if there are any warnings
sphinx:
Expand Down
6 changes: 3 additions & 3 deletions docs/source/topics/transport.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ The generic transport class contains a set of minimal methods that an implementa
If not, a ``NotImplementedError`` will be raised, interrupting the managing of the calculation or whatever is using the transport plugin.

As for the general functioning of the plugin, the :py:meth:`~aiida.transports.transport.Transport.__init__` method is used only to initialize the class instance, without actually opening the transport channel.
The connection must be opened only by the :py:meth:`~aiida.transports.transport.Transport.__enter__` method, (and closed by :py:meth:`~aiida.transports.transport.Transport.__exit__`).
The :py:meth:`~aiida.transports.transport.Transport.__enter__` method lets you use the transport class using the ``with`` statement (see `python docs <https://docs.python.org/3/reference/compound_stmts.html#with>`_), in a way similar to the following:
The connection must be opened only by the :py:meth:`~aiida.transports.transport._BaseTransport.__enter__` method, (and closed by :py:meth:`~aiida.transports.transport._BaseTransport.__exit__`).
The :py:meth:`~aiida.transports.transport._BaseTransport.__enter__` method lets you use the transport class using the ``with`` statement (see `python docs <https://docs.python.org/3/reference/compound_stmts.html#with>`_), in a way similar to the following:

.. code-block:: python

with TransportPlugin() as transport:
transport.some_method()

To ensure this, for example, the local plugin uses a hidden boolean variable ``_is_open`` that is set when the :py:meth:`~aiida.transports.transport.Transport.__enter__` and :py:meth:`~aiida.transports.transport.Transport.__exit__` methods are called.
To ensure this, for example, the local plugin uses a hidden boolean variable ``_is_open`` that is set when the :py:meth:`~aiida.transports.transport._BaseTransport.__enter__` and :py:meth:`~aiida.transports.transport._BaseTransport.__exit__` methods are called.
The ``ssh`` logic is instead given by the property sftp.

The other functions that require some care are the copying functions, called using the following terminology:
Expand Down
3 changes: 2 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies:
- python~=3.9
- alembic~=1.2
- archive-path~=0.4.2
- asyncssh~=2.19.0
- circus~=0.18.0
- click-spinner~=0.1.8
- click~=8.1
Expand All @@ -22,7 +23,7 @@ dependencies:
- importlib-metadata~=6.0
- numpy~=1.21
- paramiko~=3.0
- plumpy~=0.22.3
- plumpy~=0.23.0
- pgsu~=0.3.0
- psutil~=5.6
- psycopg[binary]~=3.0
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ classifiers = [
dependencies = [
'alembic~=1.2',
'archive-path~=0.4.2',
"asyncssh~=2.19.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the version that contains the change you mentioned from asyncssh? I remember you mentioned asyncssh did some change to solve the 4 spicks in network bandwidth usase issue?

'circus~=0.18.0',
'click-spinner~=0.1.8',
'click~=8.1',
Expand All @@ -34,7 +35,7 @@ dependencies = [
'importlib-metadata~=6.0',
'numpy~=1.21',
'paramiko~=3.0',
'plumpy~=0.22.3',
'plumpy~=0.23.0',
'pgsu~=0.3.0',
'psutil~=5.6',
'psycopg[binary]~=3.0',
Expand Down Expand Up @@ -175,6 +176,7 @@ requires-python = '>=3.9'
[project.entry-points.'aiida.transports']
'core.local' = 'aiida.transports.plugins.local:LocalTransport'
'core.ssh' = 'aiida.transports.plugins.ssh:SshTransport'
'core.ssh_async' = 'aiida.transports.plugins.ssh_async:AsyncSshTransport'
'core.ssh_auto' = 'aiida.transports.plugins.ssh_auto:SshAutoTransport'

[project.entry-points.'aiida.workflows']
Expand Down Expand Up @@ -308,6 +310,7 @@ module = 'tests.*'
ignore_missing_imports = true
module = [
'ase.*',
'asyncssh.*',
'bpython.*',
'bs4.*',
'CifFile.*',
Expand Down
5 changes: 3 additions & 2 deletions src/aiida/calculations/monitors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

import tempfile
from pathlib import Path
from typing import Union

from aiida.orm import CalcJobNode
from aiida.transports import Transport
from aiida.transports import AsyncTransport, Transport


def always_kill(node: CalcJobNode, transport: Transport) -> str | None:
def always_kill(node: CalcJobNode, transport: Union['Transport', 'AsyncTransport']) -> str | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def always_kill(node: CalcJobNode, transport: Union['Transport', 'AsyncTransport']) -> str | None:
def always_kill(node: CalcJobNode, transport: Transport | AsyncTransport) -> str | None:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for other places, but it is fine to leave it as this, I think we will find a time to change them all. So please ignore my comment above.

"""Retrieve and inspect files in working directory of job to determine whether the job should be killed.

This particular implementation is just for demonstration purposes and will kill the job as long as there is a
Expand Down
100 changes: 50 additions & 50 deletions src/aiida/engine/daemon/execmanager.py

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions src/aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def on_terminated(self) -> None:
super().on_terminated()

@override
def run(self) -> Union[plumpy.process_states.Stop, int, plumpy.process_states.Wait]:
async def run(self) -> Union[plumpy.process_states.Stop, int, plumpy.process_states.Wait]:
"""Run the calculation job.

This means invoking the `presubmit` and storing the temporary folder in the node's repository. Then we move the
Expand All @@ -535,11 +535,11 @@ def run(self) -> Union[plumpy.process_states.Stop, int, plumpy.process_states.Wa

"""
if self.inputs.metadata.dry_run:
self._perform_dry_run()
await self._perform_dry_run()
return plumpy.process_states.Stop(None, True)

if 'remote_folder' in self.inputs:
exit_code = self._perform_import()
exit_code = await self._perform_import()
return exit_code

# The following conditional is required for the caching to properly work. Even if the source node has a process
Expand Down Expand Up @@ -627,7 +627,7 @@ def _setup_inputs(self) -> None:
if not self.node.computer:
self.node.computer = self.inputs.code.computer

def _perform_dry_run(self):
async def _perform_dry_run(self):
"""Perform a dry run.

Instead of performing the normal sequence of steps, just the `presubmit` is called, which will call the method
Expand All @@ -643,13 +643,13 @@ def _perform_dry_run(self):
with LocalTransport() as transport:
with SubmitTestFolder() as folder:
calc_info = self.presubmit(folder)
upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True)
await upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True)
self.node.dry_run_info = { # type: ignore[attr-defined]
'folder': folder.abspath,
'script_filename': self.node.get_option('submit_script_filename'),
}

def _perform_import(self):
async def _perform_import(self):
"""Perform the import of an already completed calculation.

The inputs contained a `RemoteData` under the key `remote_folder` signalling that this is not supposed to be run
Expand All @@ -669,7 +669,7 @@ def _perform_import(self):
with SandboxFolder(filepath_sandbox) as retrieved_temporary_folder:
self.presubmit(folder)
self.node.set_remote_workdir(self.inputs.remote_folder.get_remote_path())
retrieved = retrieve_calculation(self.node, transport, retrieved_temporary_folder.abspath)
retrieved = await retrieve_calculation(self.node, transport, retrieved_temporary_folder.abspath)
if retrieved is not None:
self.out(self.node.link_label_retrieved, retrieved)
self.update_outputs()
Expand Down
9 changes: 6 additions & 3 deletions src/aiida/engine/processes/calcjobs/monitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import inspect
import typing as t
from datetime import datetime, timedelta
from typing import Union

from aiida.common.lang import type_check
from aiida.common.log import AIIDA_LOGGER
from aiida.orm import CalcJobNode, Dict
from aiida.plugins import BaseFactory

if t.TYPE_CHECKING:
from aiida.transports import Transport
from aiida.transports import AsyncTransport, Transport

Check warning on line 19 in src/aiida/engine/processes/calcjobs/monitors.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/processes/calcjobs/monitors.py#L19

Added line #L19 was not covered by tests

LOGGER = AIIDA_LOGGER.getChild(__name__)

Expand Down Expand Up @@ -122,7 +123,9 @@
parameters = list(signature.parameters.keys())

if any(required_parameter not in parameters for required_parameter in ('node', 'transport')):
correct_signature = '(node: CalcJobNode, transport: Transport, **kwargs) str | None:'
correct_signature = (
"(node: CalcJobNode, transport: Union['Transport', 'AsyncTransport'], **kwargs) str | None:"
)
raise ValueError(
f'The monitor `{self.entry_point}` has an invalid function signature, it should be: {correct_signature}'
)
Expand Down Expand Up @@ -176,7 +179,7 @@
def process(
self,
node: CalcJobNode,
transport: Transport,
transport: Union['Transport', 'AsyncTransport'],
) -> CalcJobMonitorResult | None:
"""Call all monitors in order and return the result as one returns anything other than ``None``.

Expand Down
8 changes: 4 additions & 4 deletions src/aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
except Exception as exception:
raise PreSubmitException('exception occurred in presubmit call') from exception
else:
remote_folder = execmanager.upload_calculation(node, transport, calc_info, folder)
remote_folder = await execmanager.upload_calculation(node, transport, calc_info, folder)
if remote_folder is not None:
process.out('remote_folder', remote_folder)
skip_submit = calc_info.skip_submit or False
Expand Down Expand Up @@ -314,7 +314,7 @@

if node.get_job_id() is None:
logger.warning(f'there is no job id for CalcJobNoe<{node.pk}>: skipping `get_detailed_job_info`')
retrieved = execmanager.retrieve_calculation(node, transport, retrieved_temporary_folder)
retrieved = await execmanager.retrieve_calculation(node, transport, retrieved_temporary_folder)
else:
try:
detailed_job_info = scheduler.get_detailed_job_info(node.get_job_id())
Expand All @@ -324,7 +324,7 @@
else:
node.set_detailed_job_info(detailed_job_info)

retrieved = execmanager.retrieve_calculation(node, transport, retrieved_temporary_folder)
retrieved = await execmanager.retrieve_calculation(node, transport, retrieved_temporary_folder)

if retrieved is not None:
process.out(node.link_label_retrieved, retrieved)
Expand Down Expand Up @@ -376,7 +376,7 @@
transport = await cancellable.with_interrupt(request)

logger.info(f'stashing calculation<{node.pk}>')
return execmanager.stash_calculation(node, transport)
return await execmanager.stash_calculation(node, transport)

Check warning on line 379 in src/aiida/engine/processes/calcjobs/tasks.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/processes/calcjobs/tasks.py#L379

Added line #L379 was not covered by tests

try:
await exponential_backoff_retry(
Expand Down
2 changes: 1 addition & 1 deletion src/aiida/engine/processes/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ def _setup_db_record(self) -> None:
self.node.store_source_info(self._func)

@override
def run(self) -> 'ExitCode' | None:
async def run(self) -> 'ExitCode' | None:
"""Run the process."""
from .exit_code import ExitCode

Expand Down
2 changes: 1 addition & 1 deletion src/aiida/engine/processes/workchains/workchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def _update_process_status(self) -> None:

@override
@Protect.final
def run(self) -> t.Any:
async def run(self) -> t.Any:
self._stepper = self.spec().get_outline().create_stepper(self) # type: ignore[arg-type]
return self._do_step()

Expand Down
6 changes: 3 additions & 3 deletions src/aiida/engine/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import contextvars
import logging
import traceback
from typing import TYPE_CHECKING, Awaitable, Dict, Hashable, Iterator, Optional
from typing import TYPE_CHECKING, Awaitable, Dict, Hashable, Iterator, Optional, Union

from aiida.orm import AuthInfo

if TYPE_CHECKING:
from aiida.transports import Transport
from aiida.transports import AsyncTransport, Transport

Check warning on line 21 in src/aiida/engine/transports.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/transports.py#L21

Added line #L21 was not covered by tests

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -54,7 +54,7 @@
return self._loop

@contextlib.contextmanager
def request_transport(self, authinfo: AuthInfo) -> Iterator[Awaitable['Transport']]:
def request_transport(self, authinfo: AuthInfo) -> Iterator[Awaitable[Union['Transport', 'AsyncTransport']]]:
"""Request a transport from an authinfo. Because the client is not allowed to
request a transport immediately they will instead be given back a future
that can be awaited to get the transport::
Expand Down
6 changes: 3 additions & 3 deletions src/aiida/orm/authinfos.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
###########################################################################
"""Module for the `AuthInfo` ORM class."""

from typing import TYPE_CHECKING, Any, Dict, Optional, Type
from typing import TYPE_CHECKING, Any, Dict, Optional, Type, Union

from aiida.common import exceptions
from aiida.manage import get_manager
Expand All @@ -21,7 +21,7 @@
from aiida.orm import Computer, User
from aiida.orm.implementation import StorageBackend
from aiida.orm.implementation.authinfos import BackendAuthInfo # noqa: F401
from aiida.transports import Transport
from aiida.transports import AsyncTransport, Transport

Check warning on line 24 in src/aiida/orm/authinfos.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/orm/authinfos.py#L24

Added line #L24 was not covered by tests

__all__ = ('AuthInfo',)

Expand Down Expand Up @@ -166,7 +166,7 @@
except KeyError:
return self.computer.get_workdir()

def get_transport(self) -> 'Transport':
def get_transport(self) -> Union['Transport', 'AsyncTransport']:
"""Return a fully configured transport that can be used to connect to the computer set for this instance."""
computer = self.computer
transport_type = computer.transport_type
Expand Down
10 changes: 5 additions & 5 deletions src/aiida/orm/computers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from aiida.orm import AuthInfo, User
from aiida.orm.implementation import StorageBackend
from aiida.schedulers import Scheduler
from aiida.transports import Transport
from aiida.transports import AsyncTransport, Transport

Check warning on line 26 in src/aiida/orm/computers.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/orm/computers.py#L26

Added line #L26 was not covered by tests

__all__ = ('Computer',)

Expand Down Expand Up @@ -622,16 +622,16 @@
# Return False if the user is not configured (in a sense, it is disabled for that user)
return False

def get_transport(self, user: Optional['User'] = None) -> 'Transport':
def get_transport(self, user: Optional['User'] = None) -> Union['Transport', 'AsyncTransport']:
"""Return a Transport class, configured with all correct parameters.
The Transport is closed (meaning that if you want to run any operation with
it, you have to open it first (i.e., e.g. for a SSH transport, you have
to open a connection). To do this you can call ``transports.open()``, or simply
to open a connection). To do this you can call ``transport.open()``, or simply
run within a ``with`` statement::

transport = Computer.get_transport()
with transport:
print(transports.whoami())
print(transport.whoami())

:param user: if None, try to obtain a transport for the default user.
Otherwise, pass a valid User.
Expand All @@ -646,7 +646,7 @@
authinfo = authinfos.AuthInfo.get_collection(self.backend).get(dbcomputer=self, aiidauser=user)
return authinfo.get_transport()

def get_transport_class(self) -> Type['Transport']:
def get_transport_class(self) -> Union[Type['Transport'], Type['AsyncTransport']]:
"""Get the transport class for this computer. Can be used to instantiate a transport instance."""
try:
return TransportFactory(self.transport_type)
Expand Down
3 changes: 2 additions & 1 deletion src/aiida/orm/nodes/data/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ def listdir_withattributes(self, path='.'):
"""Connects to the remote folder and lists the directory content.

:param relpath: If 'relpath' is specified, lists the content of the given subfolder.
:return: a list of dictionaries, where the documentation is in :py:class:Transport.listdir_withattributes.
:return: a list of dictionaries, where the documentation
is in :py:class:Transport.listdir_withattributes.
"""
authinfo = self.get_authinfo()

Expand Down
7 changes: 4 additions & 3 deletions src/aiida/orm/nodes/process/calculation/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from aiida.parsers import Parser
from aiida.schedulers.datastructures import JobInfo, JobState
from aiida.tools.calculations import CalculationTools
from aiida.transports import Transport
from aiida.transports import AsyncTransport, Transport

Check warning on line 29 in src/aiida/orm/nodes/process/calculation/calcjob.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/orm/nodes/process/calculation/calcjob.py#L29

Added line #L29 was not covered by tests

__all__ = ('CalcJobNode',)

Expand Down Expand Up @@ -450,10 +450,11 @@

return computer.get_authinfo(self.user)

def get_transport(self) -> 'Transport':
def get_transport(self) -> Union['Transport', 'AsyncTransport']:
"""Return the transport for this calculation.

:return: `Transport` configured with the `AuthInfo` associated to the computer of this node
:return: Union['Transport', 'AsyncTransport'] configured
with the `AuthInfo` associated to the computer of this node
"""
return self.get_authinfo().get_transport()

Expand Down
7 changes: 4 additions & 3 deletions src/aiida/orm/utils/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import os
import typing as t
from typing import Union

from aiida.orm.nodes.data.remote.base import RemoteData

Expand All @@ -20,14 +21,14 @@

from aiida import orm
from aiida.orm.implementation import StorageBackend
from aiida.transports import Transport
from aiida.transports import AsyncTransport, Transport

Check warning on line 24 in src/aiida/orm/utils/remote.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/orm/utils/remote.py#L24

Added line #L24 was not covered by tests


def clean_remote(transport: Transport, path: str) -> None:
def clean_remote(transport: Union['Transport', 'AsyncTransport'], path: str) -> None:
"""Recursively remove a remote folder, with the given absolute path, and all its contents. The path should be
made accessible through the transport channel, which should already be open

:param transport: an open Transport channel
:param transport: an open Union['Transport', 'AsyncTransport'] channel
:param path: an absolute path on the remote made available through the transport
"""
if not isinstance(path, str):
Expand Down
Loading
Loading