Skip to content

Commit

Permalink
Fixed empty DEBUG log
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Sep 20, 2024
1 parent de37b87 commit cf1268f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
67 changes: 47 additions & 20 deletions streamflow/deployment/connector/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import os
from abc import ABC
from typing import Any, MutableMapping, MutableSequence
from typing import Any, MutableMapping, MutableSequence, cast

import asyncssh
from asyncssh import ChannelOpenError, ConnectionLost
Expand All @@ -15,10 +15,15 @@
from streamflow.core.deployment import Connector, ExecutionLocation
from streamflow.core.exception import WorkflowExecutionException
from streamflow.core.scheduling import AvailableLocation, Hardware, Storage
from streamflow.core.utils import random_name
from streamflow.deployment.connector.base import BaseConnector
from streamflow.deployment.stream import StreamReaderWrapper, StreamWriterWrapper
from streamflow.deployment.template import CommandTemplateMap
from streamflow.log_handler import logger
from streamflow.log_handler import logger, defaultStreamHandler

asyncssh.logging.logger.setLevel(logging.DEBUG)
asyncssh.logging.logger.set_debug_level(2)
asyncssh.logging.logger.logger.addHandler(defaultStreamHandler)


def _parse_hostname(hostname):
Expand Down Expand Up @@ -56,8 +61,7 @@ async def get_connection(self) -> asyncssh.SSHClientConnection:
logger.warning(
f"Connection to {self._config.hostname} failed: {e}."
)
self._connect_event.set()
self.close()
await self.close()
raise
self._connect_event.set()
else:
Expand Down Expand Up @@ -111,13 +115,14 @@ def _get_param_from_file(self, file_path: str):
with open(file_path) as f:
return f.read().strip()

def close(self):
self._connecting = False
async def close(self):
if self._ssh_connection is not None:
self._ssh_connection.close()
await self._ssh_connection.wait_closed()
self._ssh_connection = None
if self._connect_event.is_set():
self._connect_event.clear()
self._connect_event.set() # it is necessary to free any blocked tasks and avoid deadlocks
self._connect_event.clear()
self._connecting = False

def full(self) -> bool:
return (
Expand Down Expand Up @@ -187,6 +192,7 @@ async def __aenter__(self) -> asyncssh.SSHClientProcess:
except (
ConnectionError,
ConnectionLost,
asyncssh.Error,
ChannelOpenError,
) as coe:
if isinstance(coe, ChannelOpenError):
Expand All @@ -196,7 +202,7 @@ async def __aenter__(self) -> asyncssh.SSHClientProcess:
)
self._selected_context = None
context.ssh_attempts += 1
context.close()
await context.close()
logger.warning(f"Connection attempt {context.ssh_attempts}")
await asyncio.sleep(self._retry_delay)

Expand Down Expand Up @@ -230,9 +236,8 @@ def __init__(
self._retries = retries
self._retry_delay = retry_delay

def close(self):
for c in self._contexts:
c.close()
async def close(self):
await asyncio.gather(*(asyncio.create_task(c.close()) for c in self._contexts))

def get(
self,
Expand Down Expand Up @@ -482,9 +487,9 @@ def _get_command(
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"EXECUTING command {command} on {location}" f" for job {job_name}"
if job_name
else ""
"EXECUTING command {} on {}{}".format(
command, location, f" for job {job_name}" if job_name else ""
)
)
return utils.encode_command(command)

Expand Down Expand Up @@ -658,19 +663,41 @@ async def run(
workdir=workdir,
)
command = utils.encode_command(command)
uid = random_name()
async with self._get_ssh_client_process(
location=location.name,
command=command,
stderr=asyncio.subprocess.STDOUT,
environment=environment,
) as proc:
result = await proc.wait(check=True, timeout=timeout)
a = cast(asyncssh.SSHClientConnection, proc.get_extra_info("connection"))
logger.info(
f"A. {uid} Channel is closing: {proc.is_closing()} {a.is_closed()} .A"
)
result = await proc.wait(timeout=timeout)
logger.info(
f"B. {uid} Channel is closing: {proc.is_closing()} {a.is_closed()} .B"
)
if result.returncode is None:
a = cast(asyncssh.SSHClientConnection, proc.get_extra_info("connection"))
logger.info(
f"C. {uid} Channel is closing: {proc.is_closing()} {a.is_closed()} .C"
)
raise Exception("Return code cannot be None")
return (result.stdout.strip(), result.returncode) if capture_output else None

async def undeploy(self, external: bool) -> None:
for ssh_context in self.ssh_context_factories.values():
ssh_context.close()
await asyncio.gather(
*(
asyncio.create_task(ssh_context.close())
for ssh_context in self.ssh_context_factories.values()
)
)
self.ssh_context_factories = {}
for ssh_context in self.data_transfer_context_factories.values():
ssh_context.close()
await asyncio.gather(
*(
asyncio.create_task(ssh_context.close())
for ssh_context in self.data_transfer_context_factories.values()
)
)
self.data_transfer_context_factories = {}
2 changes: 1 addition & 1 deletion tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,5 +148,5 @@ async def test_ssh_connector_multiple_request_fail(context: StreamFlowContext) -
):
assert isinstance(result, (ConnectionError, asyncssh.Error)) or (
isinstance(result, WorkflowExecutionException)
and result.args[0] == "Impossible to connect to .*"
and result.args[0] == "No more contexts available: terminating."
)

0 comments on commit cf1268f

Please sign in to comment.