Skip to content

Commit

Permalink
Fix deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Jul 18, 2024
1 parent 6f4f448 commit d19da72
Showing 1 changed file with 5 additions and 14 deletions.
19 changes: 5 additions & 14 deletions streamflow/deployment/connector/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ def full(self) -> bool:
and len(self._ssh_connection._channels) >= self._max_concurrent_sessions
)

async def sleep(self, condition: asyncio.Condition):
async def sleep(self, cond):
self._sleeping = True
await asyncio.sleep(self._retry_delay)
self._sleeping = False
async with condition:
condition.notify_all()
async with cond:
cond.notify_all()


class SSHContextManager:
Expand Down Expand Up @@ -179,18 +179,9 @@ async def __aenter__(self) -> asyncssh.SSHClientProcess:
len(free_contexts := [c for c in self._contexts if not c.full()])
== 0
):
# Wait for a connection to be closed or for a context to be available again after the `retry_delay`
_, unfinished = await asyncio.wait(
(
asyncio.create_task(
self._condition.wait(), name="__free_condition__"
),
*self._sleeping_contexts,
),
return_when=asyncio.FIRST_COMPLETED,
)
await self._condition.wait()
self._sleeping_contexts = [
t for t in unfinished if t.get_name() != "__free_condition__"
t for t in self._sleeping_contexts if not t.done()
]
else:
for context in free_contexts:
Expand Down

0 comments on commit d19da72

Please sign in to comment.