Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factory: also parse command-line options #2060

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 74 additions & 13 deletions cwltool/factory.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
"""Wrap a CWL document as a callable Python object."""

import argparse
import functools
import os
import sys
from typing import Any, Optional, Union

from . import load_tool
from .context import LoadingContext, RuntimeContext
from .argparser import arg_parser
from .context import LoadingContext, RuntimeContext, getdefault
from .errors import WorkflowException
from .executors import JobExecutor, SingleJobExecutor
from .main import find_default_container
from .process import Process
from .utils import CWLObjectType
from .resolver import tool_resolver
from .secrets import SecretStore
from .utils import DEFAULT_TMP_PREFIX, CWLObjectType


class WorkflowStatus(Exception):
Expand All @@ -25,11 +34,15 @@
self.t = t
self.factory = factory

def __call__(self, **kwargs):
# type: (**Any) -> Union[str, Optional[CWLObjectType]]
runtime_context = self.factory.runtime_context.copy()
runtime_context.basedir = os.getcwd()
out, status = self.factory.executor(self.t, kwargs, runtime_context)
def __call__(self, **kwargs: Any) -> Union[str, Optional[CWLObjectType]]:
"""
Execute the process.

:raise WorkflowStatus: If the result is not a success.
"""
if not self.factory.runtime_context.basedir:
self.factory.runtime_context.basedir = os.getcwd()
out, status = self.factory.executor(self.t, kwargs, self.factory.runtime_context)
if status != "success":
raise WorkflowStatus(out, status)
else:
Expand All @@ -47,18 +60,24 @@
executor: Optional[JobExecutor] = None,
loading_context: Optional[LoadingContext] = None,
runtime_context: Optional[RuntimeContext] = None,
argsl: Optional[list[str]] = None,
args: Optional[argparse.Namespace] = None,
) -> None:
"""Create a CWL Process factory from a CWL document."""
if argsl is not None:
args = arg_parser().parse_args(argsl)

Check warning on line 68 in cwltool/factory.py

View check run for this annotation

Codecov / codecov/patch

cwltool/factory.py#L68

Added line #L68 was not covered by tests
if executor is None:
executor = SingleJobExecutor()
self.executor = executor
self.executor: JobExecutor = SingleJobExecutor()
else:
self.executor = executor
if runtime_context is None:
self.runtime_context = RuntimeContext()
self.runtime_context = RuntimeContext(vars(args) if args else {})
self._fix_runtime_context()
else:
self.runtime_context = runtime_context
if loading_context is None:
self.loading_context = LoadingContext()
self.loading_context.singularity = self.runtime_context.singularity
self.loading_context.podman = self.runtime_context.podman
self.loading_context = LoadingContext(vars(args) if args else {})
self._fix_loading_context(self.runtime_context)
else:
self.loading_context = loading_context

Expand All @@ -68,3 +87,45 @@
if isinstance(load, int):
raise WorkflowException("Error loading tool")
return Callable(load, self)

def _fix_loading_context(self, runtime_context: RuntimeContext) -> None:
self.loading_context.resolver = getdefault(self.loading_context.resolver, tool_resolver)
self.loading_context.singularity = runtime_context.singularity
self.loading_context.podman = runtime_context.podman

def _fix_runtime_context(self) -> None:
self.runtime_context.basedir = os.getcwd()
self.runtime_context.find_default_container = functools.partial(
find_default_container, default_container=None, use_biocontainers=None
)

if sys.platform == "darwin":
default_mac_path = "/private/tmp/docker_tmp"
if self.runtime_context.tmp_outdir_prefix == DEFAULT_TMP_PREFIX:
self.runtime_context.tmp_outdir_prefix = default_mac_path

for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"):
if (
getattr(self.runtime_context, dirprefix)
and getattr(self.runtime_context, dirprefix) != DEFAULT_TMP_PREFIX
):
sl = (
"/"
if getattr(self.runtime_context, dirprefix).endswith("/")
or dirprefix == "cachedir"
else ""
)
setattr(
self.runtime_context,
dirprefix,
os.path.abspath(getattr(self.runtime_context, dirprefix)) + sl,
)
if not os.path.exists(os.path.dirname(getattr(self.runtime_context, dirprefix))):
try:
os.makedirs(os.path.dirname(getattr(self.runtime_context, dirprefix)))
except Exception as e:
print("Failed to create directory: %s", e)

Check warning on line 127 in cwltool/factory.py

View check run for this annotation

Codecov / codecov/patch

cwltool/factory.py#L124-L127

Added lines #L124 - L127 were not covered by tests

self.runtime_context.secret_store = getdefault(
self.runtime_context.secret_store, SecretStore()
)