Skip to content

Commit

Permalink
Reset tasks.py and restrict transports.py changes to `last_close_…
Browse files Browse the repository at this point in the history
…time`.
  • Loading branch information
GeigerJ2 committed Nov 4, 2024
1 parent 9a9eb16 commit d0e4777
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 97 deletions.
36 changes: 5 additions & 31 deletions src/aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
class PreSubmitException(Exception): # noqa: N818
"""Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`."""


async def task_upload_job(process: 'CalcJob', transport_queue: TransportQueue, cancellable: InterruptableFuture):
"""Transport task that will attempt to upload the files of a job calculation to the remote.
Expand Down Expand Up @@ -140,36 +141,11 @@ async def task_submit_job(node: CalcJobNode, transport_queue: TransportQueue, ca
max_attempts = get_config_option(MAX_ATTEMPTS_OPTION)

authinfo = node.get_authinfo()
# authinfo_pk = authinfo.pk

# transport_request = transport_queue._transport_requests.get(authinfo.pk, None)
# open_transport = transport_queue._open_transports.get(authinfo.pk, None)

# if open_transport is not None: # and not transport_queue._last_request_special:
# transport = open_transport
# transport_queue._last_request_special = True
# elif transport_request is None: # or transport_queue._last_request_special:
# # This is the previous behavior
# with transport_queue.request_transport(authinfo) as request:
# transport = await cancellable.with_interrupt(request)
# else:
# pass

async def do_submit():
transport_request = transport_queue._transport_requests.get(authinfo.pk, None)
open_transport = transport_queue._open_transports.get(authinfo.pk, None)

if open_transport is not None: # and not transport_queue._last_request_special:
transport = open_transport
# transport_queue._last_request_special = True
elif transport_request is None: # or transport_queue._last_request_special:
# This is the previous behavior
with transport_queue.request_transport(authinfo) as request:
transport = await cancellable.with_interrupt(request)
else:
pass

return execmanager.submit_calculation(node, transport)
async def do_submit():
with transport_queue.request_transport(authinfo) as request:
transport = await cancellable.with_interrupt(request)
return execmanager.submit_calculation(node, transport)

try:
logger.info(f'scheduled request to submit CalcJob<{node.pk}>')
Expand Down Expand Up @@ -520,11 +496,9 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
result: plumpy.process_states.State = self

process_status = f'Waiting for transport task: {self._command}'

node.set_process_status(process_status)

try:
# ? Possibly implement here to keep connection open
if self._command == UPLOAD_COMMAND:
skip_submit = await self._launch_task(task_upload_job, self.process, transport_queue)
if skip_submit:
Expand Down
75 changes: 9 additions & 66 deletions src/aiida/engine/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,7 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None):
""":param loop: An asyncio event, will use `asyncio.get_event_loop()` if not supplied"""
self._loop = loop if loop is not None else asyncio.get_event_loop()
self._transport_requests: Dict[Hashable, TransportRequest] = {}
self._open_transports: Dict[Hashable, Transport] = {}
self._last_open_time = None
self._last_close_time = None
self._last_request_special: bool = False
self._close_callback_handle = None
# self._last_transport_request: Dict[Hashable, str] = {}

@property
def loop(self) -> asyncio.AbstractEventLoop:
Expand All @@ -81,29 +76,23 @@ async def transport_task(transport_queue, authinfo):
:return: A future that can be yielded to give the transport
"""
open_callback_handle = None
close_callback_handle = None
transport_request = self._transport_requests.get(authinfo.pk, None)
# safe_open_interval = transport.get_safe_open_interval()
safe_open_interval = 30

if transport_request is None:
# There is no existing request for this transport (i.e. on this authinfo)
transport_request = TransportRequest()
self._transport_requests[authinfo.pk] = transport_request

transport = authinfo.get_transport()
safe_open_interval = transport.get_safe_open_interval()

# Check here if last_open_time > safe_interval, one could immediately open the transport
# This should be the very first request, after a while
def do_open():
"""Actually open the transport"""
if transport_request.count > 0:
# The user still wants the transport so open it
_LOGGER.debug('Transport request opening transport for %s', authinfo)
try:
transport.open()
self._last_open_time = timezone.localtime(timezone.now())
self._open_transports[authinfo.pk] = transport
except Exception as exception:
_LOGGER.error('exception occurred while trying to open transport:\n %s', exception)
transport_request.future.set_exception(exception)
Expand All @@ -120,37 +109,20 @@ def do_open():
# See https://github.com/aiidateam/aiida-core/issues/4698

# First request, submit immediately
# ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task?
if self._last_request_special:
open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context())
self._last_request_special = False

elif self._last_close_time is None:
if self._last_close_time is None:
open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context())
self._last_request_special = True

else:
close_timedelta = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds()
open_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds()

if open_timedelta > safe_open_interval:
# ! This could also be `_loop.call_soon` which has an implicit delay of 0s

# open_timedelta = close_timedelta-safe_open_interval
if close_timedelta > safe_open_interval:
# If time since last close > `safe_open_interval`, open immediately
open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context())
self._last_request_special = True

else:
# If the last one was a special request, wait the difference between safe_open_interval and lost
open_callback_handle = self._loop.call_later(safe_open_interval-open_timedelta, do_open, context=contextvars.Context())

# open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context())
# Otherwise, wait only the difference required until the `safe_open_interval` is reached
open_callback_handle = self._loop.call_later(safe_open_interval-close_timedelta, do_open, context=contextvars.Context())

# ? This logic is implemented in `tasks.py` instead.
# else:
# transport = authinfo.get_transport()
# return transport
# If transport_request is open already
try:
transport_request.count += 1
yield transport_request.future
Expand All @@ -169,39 +141,10 @@ def do_open():
if transport_request.count == 0:
if transport_request.future.done():

# ? Why is all this logic in the `request_transport` method?
# ? Shouldn't the logic to close a transport be outside, such that the transport is being closed
# ? once it was actually used???
pass
# def do_close():
# """Close the transport if conditions are met."""
# transport_request.future.result().close()
# self._last_close_time = timezone.localtime(timezone.now())

# close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds()

# if close_timedelta < safe_open_interval:

# Also here logic when transport should be closed immediately, or when via call_later?
# self._last_close_time = timezone.localtime(timezone.now())
# self._transport_requests.pop(authinfo.pk, None)
# close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context())
# if close_timedelta > safe_open_interval:
# close_callback_handle = self._loop.call_soon(do_close, context=contextvars.Context())
# self._last_close_time = timezone.localtime(timezone.now())
# self._transport_requests.pop(authinfo.pk, None)
# self._transport_requests.pop(authinfo.pk, None)

# transport_request.transport_closer = close_callback_handle

# This should be replaced with the call_later close_callback_handle invocation
# transport_request.future.result().close()
# ? When should the transport_request be popped?
# ? If it is always popped as soon as the task is done, there is no way to re-use it...
# self._transport_requests.pop(authinfo.pk, None)
_LOGGER.debug('Transport request closing transport for %s', authinfo)
transport_request.future.result().close()

elif open_callback_handle is not None:
open_callback_handle.cancel()

# ? Somewhere I still need to `pop` the transport_request... or do I?
# self._transport_requests.pop(authinfo.pk, None)
self._transport_requests.pop(authinfo.pk, None)

0 comments on commit d0e4777

Please sign in to comment.