From 95aaaf31095b010ea224b23bca75fa43c290b9f3 Mon Sep 17 00:00:00 2001 From: Iacopo Colonnelli Date: Mon, 6 Feb 2023 15:19:39 +0100 Subject: [PATCH] Moving some logic from Translator to utils (#66) Moving language-agnostic features away from the CWL Translator fosters their reusability with different workflow languages (e.g., Jupyter Workflow). --- streamflow/cwl/translator.py | 62 ++++----------------------------- streamflow/deployment/utils.py | 63 +++++++++++++++++++++++++++++++++- streamflow/workflow/utils.py | 13 ++++--- 3 files changed, 78 insertions(+), 60 deletions(-) diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 8b58e13a..436b2261 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -87,6 +87,7 @@ ValueFromTransformer, ) from streamflow.cwl.utils import LoadListing, SecondaryFile, resolve_dependencies +from streamflow.deployment.utils import get_binding_config from streamflow.log_handler import logger from streamflow.workflow.combinator import ( CartesianProductCombinator, @@ -1256,57 +1257,6 @@ def _get_source_port(self, workflow: Workflow, source_name: str) -> Port: else: return self.input_ports[source_name] - def _get_binding_config(self, name: str, target_type: str) -> BindingConfig: - path = PurePosixPath(name) - config = self.workflow_config.propagate(path, target_type) - if config is not None: - targets = [] - for target in config["targets"]: - workdir = target.get("workdir") if target is not None else None - if "deployment" in target: - target_deployment = self.workflow_config.deplyoments[ - target["deployment"] - ] - else: - target_deployment = self.workflow_config.deplyoments[ - target["model"] - ] - if logger.isEnabledFor(logging.WARN): - logger.warn( - "The `model` keyword is deprecated and will be removed in StreamFlow 0.3.0. " - "Use `deployment` instead." - ) - locations = target.get("locations", None) - if locations is None: - locations = target.get("resources") - if locations is not None: - if logger.isEnabledFor(logging.WARN): - logger.warn( - "The `resources` keyword is deprecated and will be removed in StreamFlow 0.3.0. " - "Use `locations` instead." - ) - else: - locations = 1 - deployment = DeploymentConfig( - name=target_deployment["name"], - type=target_deployment["type"], - config=target_deployment["config"], - external=target_deployment.get("external", False), - lazy=target_deployment.get("lazy", True), - workdir=target_deployment.get("workdir"), - ) - targets.append( - Target( - deployment=deployment, - locations=locations, - service=target.get("service"), - workdir=workdir, - ) - ) - return BindingConfig(targets=targets, filters=config.get("filters")) - else: - return BindingConfig(targets=[LocalTarget()]) - def _handle_default_port( self, global_name: str, @@ -1350,8 +1300,8 @@ def _inject_input( output_directory: str, value: Any, ) -> None: - # Retrieve a local DeployStep - binding_config = self._get_binding_config(global_name, "port") + # Retrieve the DeployStep for the port target + binding_config = get_binding_config(global_name, "port", self.workflow_config) target = binding_config.targets[0] deploy_step = self._get_deploy_step(target.deployment, workflow) # Remap path if target's workdir is defined @@ -1505,7 +1455,7 @@ def _translate_command_line_tool( # Process InlineJavascriptRequirement expression_lib, full_js = _process_javascript_requirement(requirements) # Retrieve target - binding_config = self._get_binding_config(name_prefix, "step") + binding_config = get_binding_config(name_prefix, "step", self.workflow_config) # Process DockerRequirement if "DockerRequirement" in requirements: network_access = ( @@ -1622,7 +1572,9 @@ def _translate_command_line_tool( output_port = self.output_ports[global_name] # If the port is bound to a remote target, add the connector dependency if self.workflow_config.propagate(PurePosixPath(global_name), "port"): - binding_config = self._get_binding_config(global_name, "port") + binding_config = get_binding_config( + global_name, "port", self.workflow_config + ) port_target = binding_config.targets[0] output_deploy_step = self._get_deploy_step( port_target.deployment, workflow diff --git a/streamflow/deployment/utils.py b/streamflow/deployment/utils.py index e1f8bc05..1a0b8c4d 100644 --- a/streamflow/deployment/utils.py +++ b/streamflow/deployment/utils.py @@ -1,9 +1,70 @@ +from __future__ import annotations + +import logging import os import posixpath +from pathlib import PurePosixPath from types import ModuleType +from typing import TYPE_CHECKING -from streamflow.core.deployment import Connector +from streamflow.core.config import BindingConfig +from streamflow.core.deployment import DeploymentConfig, LocalTarget, Target from streamflow.deployment.connector import LocalConnector +from streamflow.log_handler import logger + +if TYPE_CHECKING: + from streamflow.config.config import WorkflowConfig + from streamflow.core.deployment import Connector + + +def get_binding_config( + name: str, target_type: str, workflow_config: WorkflowConfig +) -> BindingConfig: + path = PurePosixPath(name) + config = workflow_config.propagate(path, target_type) + if config is not None: + targets = [] + for target in config["targets"]: + workdir = target.get("workdir") if target is not None else None + if "deployment" in target: + target_deployment = workflow_config.deplyoments[target["deployment"]] + else: + target_deployment = workflow_config.deplyoments[target["model"]] + if logger.isEnabledFor(logging.WARN): + logger.warn( + "The `model` keyword is deprecated and will be removed in StreamFlow 0.3.0. " + "Use `deployment` instead." + ) + locations = target.get("locations", None) + if locations is None: + locations = target.get("resources") + if locations is not None: + if logger.isEnabledFor(logging.WARN): + logger.warn( + "The `resources` keyword is deprecated and will be removed in StreamFlow 0.3.0. " + "Use `locations` instead." + ) + else: + locations = 1 + deployment = DeploymentConfig( + name=target_deployment["name"], + type=target_deployment["type"], + config=target_deployment["config"], + external=target_deployment.get("external", False), + lazy=target_deployment.get("lazy", True), + workdir=target_deployment.get("workdir"), + ) + targets.append( + Target( + deployment=deployment, + locations=locations, + service=target.get("service"), + workdir=workdir, + ) + ) + return BindingConfig(targets=targets, filters=config.get("filters")) + else: + return BindingConfig(targets=[LocalTarget()]) def get_path_processor(connector: Connector) -> ModuleType: diff --git a/streamflow/workflow/utils.py b/streamflow/workflow/utils.py index cfbe90de..cb940042 100644 --- a/streamflow/workflow/utils.py +++ b/streamflow/workflow/utils.py @@ -1,4 +1,6 @@ -from typing import Any, Iterable, MutableSequence, Type, Union +from __future__ import annotations + +from typing import MutableSequence, TYPE_CHECKING from streamflow.core.workflow import Token from streamflow.workflow.token import ( @@ -8,16 +10,19 @@ TerminationToken, ) +if TYPE_CHECKING: + from typing import Any, Iterable + -def check_iteration_termination(inputs: Union[Token, Iterable[Token]]) -> bool: +def check_iteration_termination(inputs: Token | Iterable[Token]) -> bool: return check_token_class(inputs, IterationTerminationToken) -def check_termination(inputs: Union[Token, Iterable[Token]]) -> bool: +def check_termination(inputs: Token | Iterable[Token]) -> bool: return check_token_class(inputs, TerminationToken) -def check_token_class(inputs: Union[Token, Iterable[Token]], cls: Type[Token]): +def check_token_class(inputs: Token | Iterable[Token], cls: type[Token]): if isinstance(inputs, Token): return isinstance(inputs, cls) else: