diff --git a/streamflow/deployment/connector/ssh.py b/streamflow/deployment/connector/ssh.py index 76df0562..5f89184f 100644 --- a/streamflow/deployment/connector/ssh.py +++ b/streamflow/deployment/connector/ssh.py @@ -136,12 +136,12 @@ def full(self) -> bool: and len(self._ssh_connection._channels) >= self._max_concurrent_sessions ) - async def sleep(self, condition: asyncio.Condition): + async def sleep(self, cond): self._sleeping = True await asyncio.sleep(self._retry_delay) self._sleeping = False - async with condition: - condition.notify_all() + async with cond: + cond.notify_all() class SSHContextManager: @@ -179,18 +179,9 @@ async def __aenter__(self) -> asyncssh.SSHClientProcess: len(free_contexts := [c for c in self._contexts if not c.full()]) == 0 ): - # Wait for a connection to be closed or for a context to be available again after the `retry_delay` - _, unfinished = await asyncio.wait( - ( - asyncio.create_task( - self._condition.wait(), name="__free_condition__" - ), - *self._sleeping_contexts, - ), - return_when=asyncio.FIRST_COMPLETED, - ) + await self._condition.wait() self._sleeping_contexts = [ - t for t in unfinished if t.get_name() != "__free_condition__" + t for t in self._sleeping_contexts if not t.done() ] else: for context in free_contexts: @@ -777,6 +768,9 @@ async def run( environment=environment, ) as proc: result = await proc.wait(timeout=timeout) + logger.info(f"CMD: {command} returncode: {result.returncode}") + if result.returncode is None: + result.returncode = 9999 return (result.stdout.strip(), result.returncode) if capture_output else None async def undeploy(self, external: bool) -> None: