Skip to content

Commit

Permalink
Moving some logic from Translator to utils (#66)
Browse files Browse the repository at this point in the history
Moving language-agnostic features away from the CWL Translator fosters
their reusability with different workflow languages (e.g., Jupyter
Workflow).
  • Loading branch information
GlassOfWhiskey authored Feb 6, 2023
1 parent ea59592 commit 95aaaf3
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 60 deletions.
62 changes: 7 additions & 55 deletions streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
ValueFromTransformer,
)
from streamflow.cwl.utils import LoadListing, SecondaryFile, resolve_dependencies
from streamflow.deployment.utils import get_binding_config
from streamflow.log_handler import logger
from streamflow.workflow.combinator import (
CartesianProductCombinator,
Expand Down Expand Up @@ -1256,57 +1257,6 @@ def _get_source_port(self, workflow: Workflow, source_name: str) -> Port:
else:
return self.input_ports[source_name]

def _get_binding_config(self, name: str, target_type: str) -> BindingConfig:
path = PurePosixPath(name)
config = self.workflow_config.propagate(path, target_type)
if config is not None:
targets = []
for target in config["targets"]:
workdir = target.get("workdir") if target is not None else None
if "deployment" in target:
target_deployment = self.workflow_config.deplyoments[
target["deployment"]
]
else:
target_deployment = self.workflow_config.deplyoments[
target["model"]
]
if logger.isEnabledFor(logging.WARN):
logger.warn(
"The `model` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `deployment` instead."
)
locations = target.get("locations", None)
if locations is None:
locations = target.get("resources")
if locations is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
"The `resources` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locations` instead."
)
else:
locations = 1
deployment = DeploymentConfig(
name=target_deployment["name"],
type=target_deployment["type"],
config=target_deployment["config"],
external=target_deployment.get("external", False),
lazy=target_deployment.get("lazy", True),
workdir=target_deployment.get("workdir"),
)
targets.append(
Target(
deployment=deployment,
locations=locations,
service=target.get("service"),
workdir=workdir,
)
)
return BindingConfig(targets=targets, filters=config.get("filters"))
else:
return BindingConfig(targets=[LocalTarget()])

def _handle_default_port(
self,
global_name: str,
Expand Down Expand Up @@ -1350,8 +1300,8 @@ def _inject_input(
output_directory: str,
value: Any,
) -> None:
# Retrieve a local DeployStep
binding_config = self._get_binding_config(global_name, "port")
# Retrieve the DeployStep for the port target
binding_config = get_binding_config(global_name, "port", self.workflow_config)
target = binding_config.targets[0]
deploy_step = self._get_deploy_step(target.deployment, workflow)
# Remap path if target's workdir is defined
Expand Down Expand Up @@ -1505,7 +1455,7 @@ def _translate_command_line_tool(
# Process InlineJavascriptRequirement
expression_lib, full_js = _process_javascript_requirement(requirements)
# Retrieve target
binding_config = self._get_binding_config(name_prefix, "step")
binding_config = get_binding_config(name_prefix, "step", self.workflow_config)
# Process DockerRequirement
if "DockerRequirement" in requirements:
network_access = (
Expand Down Expand Up @@ -1622,7 +1572,9 @@ def _translate_command_line_tool(
output_port = self.output_ports[global_name]
# If the port is bound to a remote target, add the connector dependency
if self.workflow_config.propagate(PurePosixPath(global_name), "port"):
binding_config = self._get_binding_config(global_name, "port")
binding_config = get_binding_config(
global_name, "port", self.workflow_config
)
port_target = binding_config.targets[0]
output_deploy_step = self._get_deploy_step(
port_target.deployment, workflow
Expand Down
63 changes: 62 additions & 1 deletion streamflow/deployment/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,70 @@
from __future__ import annotations

import logging
import os
import posixpath
from pathlib import PurePosixPath
from types import ModuleType
from typing import TYPE_CHECKING

from streamflow.core.deployment import Connector
from streamflow.core.config import BindingConfig
from streamflow.core.deployment import DeploymentConfig, LocalTarget, Target
from streamflow.deployment.connector import LocalConnector
from streamflow.log_handler import logger

if TYPE_CHECKING:
from streamflow.config.config import WorkflowConfig
from streamflow.core.deployment import Connector


def get_binding_config(
name: str, target_type: str, workflow_config: WorkflowConfig
) -> BindingConfig:
path = PurePosixPath(name)
config = workflow_config.propagate(path, target_type)
if config is not None:
targets = []
for target in config["targets"]:
workdir = target.get("workdir") if target is not None else None
if "deployment" in target:
target_deployment = workflow_config.deplyoments[target["deployment"]]
else:
target_deployment = workflow_config.deplyoments[target["model"]]
if logger.isEnabledFor(logging.WARN):
logger.warn(
"The `model` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `deployment` instead."
)
locations = target.get("locations", None)
if locations is None:
locations = target.get("resources")
if locations is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
"The `resources` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locations` instead."
)
else:
locations = 1
deployment = DeploymentConfig(
name=target_deployment["name"],
type=target_deployment["type"],
config=target_deployment["config"],
external=target_deployment.get("external", False),
lazy=target_deployment.get("lazy", True),
workdir=target_deployment.get("workdir"),
)
targets.append(
Target(
deployment=deployment,
locations=locations,
service=target.get("service"),
workdir=workdir,
)
)
return BindingConfig(targets=targets, filters=config.get("filters"))
else:
return BindingConfig(targets=[LocalTarget()])


def get_path_processor(connector: Connector) -> ModuleType:
Expand Down
13 changes: 9 additions & 4 deletions streamflow/workflow/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Any, Iterable, MutableSequence, Type, Union
from __future__ import annotations

from typing import MutableSequence, TYPE_CHECKING

from streamflow.core.workflow import Token
from streamflow.workflow.token import (
Expand All @@ -8,16 +10,19 @@
TerminationToken,
)

if TYPE_CHECKING:
from typing import Any, Iterable


def check_iteration_termination(inputs: Union[Token, Iterable[Token]]) -> bool:
def check_iteration_termination(inputs: Token | Iterable[Token]) -> bool:
return check_token_class(inputs, IterationTerminationToken)


def check_termination(inputs: Union[Token, Iterable[Token]]) -> bool:
def check_termination(inputs: Token | Iterable[Token]) -> bool:
return check_token_class(inputs, TerminationToken)


def check_token_class(inputs: Union[Token, Iterable[Token]], cls: Type[Token]):
def check_token_class(inputs: Token | Iterable[Token], cls: type[Token]):
if isinstance(inputs, Token):
return isinstance(inputs, cls)
else:
Expand Down

0 comments on commit 95aaaf3

Please sign in to comment.