Skip to content

[RFC] Tool Dependencies for CWL #93

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cwltool/draft2tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def rm_pending_output_callback(output_callback, jobcachepending,
reffiles = copy.deepcopy(builder.files)

j = self.makeJobRunner()
j.tool_dependency_manager = self.tool_dependency_manager
j.builder = builder
j.joborder = builder.job
j.stdin = None
Expand Down
233 changes: 204 additions & 29 deletions cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import logging
import sys
import string
import requests
from . import docker
from .process import get_feature, empty_subtree, stageFiles
Expand All @@ -19,11 +20,18 @@
from typing import Union, Iterable, Callable, Any, Mapping, IO, cast, Tuple
from .pathmapper import PathMapper
import functools
try:
from galaxy.tools.deps.requirements import ToolRequirement
except ImportError:
ToolRequirement = None

_logger = logging.getLogger("cwltool")

needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")

FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"


def deref_links(outputs): # type: (Any) -> None
if isinstance(outputs, dict):
if outputs.get("class") == "File":
Expand Down Expand Up @@ -60,6 +68,7 @@ def __init__(self): # type: () -> None
self.environment = None # type: Dict[str,str]
self.generatefiles = None # type: Dict[unicode, Union[List[Dict[str, str]], Dict[str,str], str]]
self.stagedir = None # type: unicode
self.dependency_manager = None # type: DependencyManager

def run(self, dry_run=False, pull_image=True, rm_container=True,
rm_tmpdir=True, move_outputs="move", **kwargs):
Expand Down Expand Up @@ -188,50 +197,47 @@ def linkoutdir(src, tgt):
stageFiles(generatemapper, linkoutdir)

if self.stdin:
stdin = open(self.pathmapper.reversemap(self.stdin)[1], "rb")
stdin_path = self.pathmapper.reversemap(self.stdin)[1]
else:
stdin = subprocess.PIPE
stdin_path = None

if self.stderr:
abserr = os.path.join(self.outdir, self.stderr)
dnerr = os.path.dirname(abserr)
if dnerr and not os.path.exists(dnerr):
os.makedirs(dnerr)
stderr = open(abserr, "wb")
stderr_path = abserr
else:
stderr = sys.stderr
stderr_path = None

if self.stdout:
absout = os.path.join(self.outdir, self.stdout)
dn = os.path.dirname(absout)
if dn and not os.path.exists(dn):
os.makedirs(dn)
stdout = open(absout, "wb")
stdout_path = absout
else:
stdout = sys.stderr

sp = subprocess.Popen([unicode(x).encode('utf-8') for x in runtime + self.command_line],
shell=False,
close_fds=True,
stdin=stdin,
stderr=stderr,
stdout=stdout,
env=env,
cwd=self.outdir)

if sp.stdin:
sp.stdin.close()

rcode = sp.wait()

if isinstance(stdin, file):
stdin.close()

if stderr is not sys.stderr:
stderr.close()

if stdout is not sys.stderr:
stdout.close()
stdout_path = None

prefix = None
job_dir = None
if self.tool_dependency_manager is not None:
dependencies = self._find_tool_dependencies()
job_dir = tempfile.mkdtemp(prefix="cwltooljob")
shell_commands = self.tool_dependency_manager.dependency_shell_commands(
dependencies,
job_directory=job_dir,
)
prefix = "\n".join(shell_commands)

rcode = shelled_popen(
[unicode(x).encode('utf-8') for x in runtime + self.command_line],
stdin_path=stdin_path,
stdout_path=stdout_path,
stderr_path=stderr_path,
env=env,
cwd=self.outdir,
)

if self.successCodes and rcode in self.successCodes:
processStatus = "success"
Expand Down Expand Up @@ -290,3 +296,172 @@ def linkoutdir(src, tgt):
if move_outputs == "move" and empty_subtree(self.outdir):
_logger.debug(u"[job %s] Removing empty output directory %s", self.name, self.outdir)
shutil.rmtree(self.outdir, True)

def _find_tool_dependencies(self):
dependencies = []
for hint in self.hints:
hint_class = hint.get("class", "")
if not hint_class:
continue
base_name = hint["class"].rsplit("/", 1)[-1]
if base_name == "Dependency":
requirement_desc = {}
requirement_desc["type"] = "package"
name = hint["name"].rsplit("#", 1)[-1]
version = hint.get("version", "").rsplit("#", 1)[-1]
requirement_desc["name"] = name
requirement_desc["version"] = version or None
dependencies.append(ToolRequirement.from_dict(requirement_desc))
return dependencies


SHELL_COMMAND_TEMPLATE = string.Template("""#!/bin/bash
$prefix
python "run_job.py" "job.json"
""")
PYTHON_RUN_SCRIPT = """
import json
import sys
import subprocess

with open(sys.argv[1], "r") as f:
popen_description = json.load(f)
commands = popen_description["commands"]
cwd = popen_description["cwd"]
env = popen_description["env"]
stdin_path = popen_description["stdin_path"]
stdout_path = popen_description["stdout_path"]
stderr_path = popen_description["stderr_path"]

if stdin_path is not None:
stdin = open(stdin_path, "rd")
else:
stdin = subprocess.PIPE

if stdout_path is not None:
stdout = open(stdout_path, "wb")
else:
stdout = sys.stderr

if stderr_path is not None:
stderr = open(stderr_path, "wb)
else:
stderr = sys.stderr

sp = subprocess.Popen(commands,
shell=False,
close_fds=True,
stdin=stdin,
stdout=stdout,
env=env,
cwd=cwd)

if sp.stdin:
sp.stdin.close()

rcode = sp.wait()

if isinstance(stdin, file):
stdin.close()

if stdout is not sys.stderr:
stdout.close()

if stderr is not sys.stderr:
stderr.close()

sys.exit(rcode)
"""


def shelled_popen(commands,
stdin_path,
stdout_path,
stderr_path,
env,
cwd,
job_dir=None,
prefix=None):
if prefix is None and not FORCE_SHELLED_POPEN:
if stdin_path is not None:
stdin = open(stdin_path, "rd")
else:
stdin = subprocess.PIPE

if stdout_path is not None:
stdout = open(stdout_path, "wb")
else:
stdout = sys.stderr

if stderr_path is not None:
stderr = open(stderr_path, "wb")
else:
stderr = sys.stderr

sp = subprocess.Popen(commands,
shell=False,
close_fds=True,
stdin=stdin,
stdout=stdout,
stderr=stderr,
env=env,
cwd=cwd)

if sp.stdin:
sp.stdin.close()

rcode = sp.wait()

if isinstance(stdin, file):
stdin.close()

if stdout is not sys.stderr:
stdout.close()

if stderr is not sys.stderr:
stderr.close()

return rcode
else:
if job_dir is None:
job_dir = tempfile.mkdtemp(prefix="cwltooljob")

template_kwds = dict(
prefix=prefix or '',
)
job_script_contents = SHELL_COMMAND_TEMPLATE.substitute(
**template_kwds
)
job_description = dict(
commands=commands,
cwd=cwd,
env=env.copy(),
stdout_path=stdout_path,
stderr_path=stderr_path,
stdin_path=stdin_path,
)
with open(os.path.join(job_dir, "job.json"), "w") as f:
json.dump(job_description, f)
try:
job_script = os.path.join(job_dir, "run_job.bash")
with open(job_script, "w") as f:
f.write(job_script_contents)
job_run = os.path.join(job_dir, "run_job.py")
with open(job_run, "w") as f:
f.write(PYTHON_RUN_SCRIPT)
sp = subprocess.Popen(
["bash", job_script],
shell=False,
cwd=job_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
)
if sp.stdin:
sp.stdin.close()

rcode = sp.wait()

return rcode
finally:
shutil.rmtree(job_dir)
38 changes: 37 additions & 1 deletion cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

import rdflib
from typing import Union, Any, cast, Callable, Dict, Tuple, IO
try:
from galaxy.tools import deps
except ImportError:
deps = None

from schema_salad.ref_resolver import Loader
import schema_salad.validate as validate
Expand All @@ -37,6 +41,9 @@
_logger.addHandler(defaultStreamHandler)
_logger.setLevel(logging.INFO)

if deps is not None:
deps.log = _logger


def arg_parser(): # type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(description='Reference executor for Common Workflow Language')
Expand Down Expand Up @@ -138,6 +145,11 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
exgroup.add_argument("--debug", action="store_true", help="Print even more logging")

# help="Dependency resolver configuration file describing how to adapt 'Dependency' hints to current system."
parser.add_argument("--beta-dependency-resolvers-configuration", default=None, help=argparse.SUPPRESS)
# help="Defaut root directory used by dependency resolvers configuration."
parser.add_argument("--beta-dependencies-directory", default=None, help=argparse.SUPPRESS)

parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")

parser.add_argument("--relative-deps", choices=['primary', 'cwd'], default="primary",
Expand Down Expand Up @@ -601,6 +613,12 @@ def main(argsl=None,
if not hasattr(args, k):
setattr(args, k, v)

if deps is not None:
tool_dependencies_configuartion = DependenciesConfigruation(args)
tool_dependency_manager = deps.build_dependency_manager(tool_dependencies_configuartion)
else:
tool_dependency_manager = None

if args.quiet:
_logger.setLevel(logging.WARN)
if args.debug:
Expand Down Expand Up @@ -645,8 +663,11 @@ def main(argsl=None,
printdot(uri, processobj, document_loader.ctx, stdout)
return 0

make_tool_kwargs = {
'tool_dependency_manager': tool_dependency_manager,
}
tool = make_tool(document_loader, avsc_names, metadata, uri,
makeTool, {})
makeTool, make_tool_kwargs)
except (validate.ValidationException) as exc:
_logger.error(u"Tool definition failed validation:\n%s", exc,
exc_info=(exc if args.debug else False))
Expand Down Expand Up @@ -749,5 +770,20 @@ def locToPath(p):
_logger.removeHandler(stderr_handler)
_logger.addHandler(defaultStreamHandler)


class DependenciesConfigruation(object):

def __init__(self, args):
conf_file = getattr(args, "beta_dependency_resolvers_configuration", None)
tool_dependency_dir = getattr(args, "beta_dependencies_directory", None)
if conf_file is not None and os.path.exists(conf_file):
self.use_tool_dependencies = True
if not tool_dependency_dir:
tool_dependency_dir = os.path.abspath(os.path.dirname(conf_file))
self.tool_dependency_dir = tool_dependency_dir
self.dependency_resolvers_config_file = conf_file
else:
self.use_tool_dependencies = False

if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
2 changes: 1 addition & 1 deletion cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def __init__(self, toolpath_object, **kwargs):
avro.schema.make_avsc_object(self.outputs_record_schema, self.names)
except avro.schema.SchemaParseException as e:
raise validate.ValidationException(u"Got error `%s` while prcoessing outputs of %s:\n%s" % (str(e), self.tool["id"], json.dumps(self.outputs_record_schema, indent=4)))

self.tool_dependency_manager = kwargs.get("tool_dependency_manager", None)

def _init_job(self, joborder, **kwargs):
# type: (Dict[unicode, unicode], **Any) -> Builder
Expand Down