Skip to content

Commit

Permalink
Fix deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Jul 18, 2024
1 parent 6f4f448 commit a387d0a
Showing 1 changed file with 8 additions and 14 deletions.
22 changes: 8 additions & 14 deletions streamflow/deployment/connector/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ def full(self) -> bool:
and len(self._ssh_connection._channels) >= self._max_concurrent_sessions
)

async def sleep(self, condition: asyncio.Condition):
async def sleep(self, cond):
self._sleeping = True
await asyncio.sleep(self._retry_delay)
self._sleeping = False
async with condition:
condition.notify_all()
async with cond:
cond.notify_all()


class SSHContextManager:
Expand Down Expand Up @@ -179,18 +179,9 @@ async def __aenter__(self) -> asyncssh.SSHClientProcess:
len(free_contexts := [c for c in self._contexts if not c.full()])
== 0
):
# Wait for a connection to be closed or for a context to be available again after the `retry_delay`
_, unfinished = await asyncio.wait(
(
asyncio.create_task(
self._condition.wait(), name="__free_condition__"
),
*self._sleeping_contexts,
),
return_when=asyncio.FIRST_COMPLETED,
)
await self._condition.wait()
self._sleeping_contexts = [
t for t in unfinished if t.get_name() != "__free_condition__"
t for t in self._sleeping_contexts if not t.done()
]
else:
for context in free_contexts:
Expand Down Expand Up @@ -777,6 +768,9 @@ async def run(
environment=environment,
) as proc:
result = await proc.wait(timeout=timeout)
logger.info(f"CMD: {command} returncode: {result.returncode}")
if result.returncode is None:
result.returncode = 9999
return (result.stdout.strip(), result.returncode) if capture_output else None

async def undeploy(self, external: bool) -> None:
Expand Down

0 comments on commit a387d0a

Please sign in to comment.