Skip to content

Cache jobs #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 30, 2016
Merged
128 changes: 92 additions & 36 deletions cwltool/draft2tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import copy
from .flatten import flatten
import functools
from functools import partial
import os
from .pathmapper import PathMapper, DockerPathMapper
from .job import CommandLineJob
Expand All @@ -23,6 +23,8 @@
import shellescape
import errno
from typing import Callable, Any, Union, Generator, cast
import hashlib
import shutil

_logger = logging.getLogger("cwltool")

Expand Down Expand Up @@ -94,6 +96,20 @@ def revmap_file(builder, outdir, f):
else:
raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input file pass through." % (f["path"], builder.outdir))

class CallbackJob(object):
def __init__(self, job, output_callback, cachebuilder, jobcache):
# type: (CommandLineTool, Callable[[Any, Any], Any], Builder, str) -> None
self.job = job
self.output_callback = output_callback
self.cachebuilder = cachebuilder
self.outdir = jobcache

def run(self, **kwargs):
# type: (**Any) -> None
self.output_callback(self.job.collect_output_ports(self.job.tool["outputs"],
self.cachebuilder, self.outdir),
"success")


class CommandLineTool(Process):
def __init__(self, toolpath_object, **kwargs):
Expand All @@ -116,34 +132,73 @@ def makePathMapper(self, reffiles, input_basedir, **kwargs):
raise WorkflowException(u"Missing input file %s" % e)

def job(self, joborder, input_basedir, output_callback, **kwargs):
# type: (Dict[str,str], str, Callable[[Any, Any], Any], **Any) -> Generator[CommandLineJob, None, None]
builder = self._init_job(joborder, input_basedir, **kwargs)

if self.tool["baseCommand"]:
for n, b in enumerate(aslist(self.tool["baseCommand"])):
builder.bindings.append({
"position": [-1000000, n],
"valueFrom": b
})

if self.tool.get("arguments"):
for i, a in enumerate(self.tool["arguments"]):
if isinstance(a, dict):
a = copy.copy(a)
if a.get("position"):
a["position"] = [a["position"], i]
else:
a["position"] = [0, i]
a["do_eval"] = a["valueFrom"]
a["valueFrom"] = None
builder.bindings.append(a)
# type: (Dict[str,str], str, Callable[..., Any], **Any) -> Generator[Union[CommandLineJob, CallbackJob], None, None]

jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job"))))

if kwargs.get("cachedir"):
cacheargs = kwargs.copy()
cacheargs["outdir"] = "/out"
cacheargs["tmpdir"] = "/tmp"
cachebuilder = self._init_job(joborder, input_basedir, **cacheargs)
cachebuilder.pathmapper = PathMapper(set((f["path"] for f in cachebuilder.files)),
input_basedir)

cmdline = flatten(map(cachebuilder.generate_arg, cachebuilder.bindings))
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull")
cmdline = ["docker", "run", dockerimg] + cmdline
keydict = {"cmdline": cmdline}

for _,f in cachebuilder.pathmapper.items():
st = os.stat(f[0])
keydict[f[0]] = [st.st_size, int(st.st_mtime * 1000)]

interesting = {"DockerRequirement",
"EnvVarRequirement",
"CreateFileRequirement",
"ShellCommandRequirement"}
for rh in (self.requirements, self.hints):
for r in reversed(rh):
if r["class"] in interesting and r["class"] not in keydict:
keydict[r["class"]] = r

keydictstr = json.dumps(keydict, separators=(',',':'), sort_keys=True)
cachekey = hashlib.md5(keydictstr).hexdigest()

_logger.debug("[job %s] keydictstr is %s -> %s", jobname, keydictstr, cachekey)

jobcache = os.path.join(kwargs["cachedir"], cachekey)
jobcachepending = jobcache + ".pending"

if os.path.isdir(jobcache) and not os.path.isfile(jobcachepending):
if docker_req and kwargs.get("use_container") is not False:
cachebuilder.outdir = kwargs.get("docker_outdir") or "/var/spool/cwl"
else:
builder.bindings.append({
"position": [0, i],
"valueFrom": a
})
cachebuilder.outdir = jobcache

builder.bindings.sort(key=lambda a: a["position"])
_logger.info("[job %s] Using cached output in %s", jobname, jobcache)
yield CallbackJob(self, output_callback, cachebuilder, jobcache)
return
else:
_logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache)
shutil.rmtree(jobcache, True)
os.makedirs(jobcache)
kwargs["outdir"] = jobcache
open(jobcachepending, "w").close()
def rm_pending_output_callback(output_callback, jobcachepending,
outputs, processStatus):
if processStatus == "success":
os.remove(jobcachepending)
output_callback(outputs, processStatus)
output_callback = cast(
Callable[..., Any], # known bug in mypy
# https://github.com/python/mypy/issues/797
partial(rm_pending_output_callback, output_callback,
jobcachepending))

builder = self._init_job(joborder, input_basedir, **kwargs)

reffiles = set((f["path"] for f in builder.files))

Expand All @@ -157,7 +212,7 @@ def job(self, joborder, input_basedir, output_callback, **kwargs):
j.permanentFailCodes = self.tool.get("permanentFailCodes")
j.requirements = self.requirements
j.hints = self.hints
j.name = uniquename(kwargs.get("name", str(id(j))))
j.name = jobname

_logger.debug(u"[job %s] initializing from %s%s",
j.name,
Expand Down Expand Up @@ -195,7 +250,7 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]

_logger.debug(u"[job %s] command line bindings is %s", j.name, json.dumps(builder.bindings, indent=4))

dockerReq, _ = self.get_requirement("DockerRequirement")
dockerReq = self.get_requirement("DockerRequirement")[0]
if dockerReq and kwargs.get("use_container"):
out_prefix = kwargs.get("tmp_outdir_prefix")
j.outdir = kwargs.get("outdir") or tempfile.mkdtemp(prefix=out_prefix)
Expand All @@ -205,19 +260,19 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
j.outdir = builder.outdir
j.tmpdir = builder.tmpdir

createFiles, _ = self.get_requirement("CreateFileRequirement")
createFiles = self.get_requirement("CreateFileRequirement")[0]
j.generatefiles = {}
if createFiles:
for t in createFiles["fileDef"]:
j.generatefiles[builder.do_eval(t["filename"])] = copy.deepcopy(builder.do_eval(t["fileContent"]))

j.environment = {}
evr, _ = self.get_requirement("EnvVarRequirement")
evr = self.get_requirement("EnvVarRequirement")[0]
if evr:
for t in evr["envDef"]:
j.environment[t["envName"]] = builder.do_eval(t["envValue"])

shellcmd, _ = self.get_requirement("ShellCommandRequirement")
shellcmd = self.get_requirement("ShellCommandRequirement")[0]
if shellcmd:
cmd = [] # type: List[str]
for b in builder.bindings:
Expand All @@ -230,7 +285,8 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
j.command_line = flatten(map(builder.generate_arg, builder.bindings))

j.pathmapper = builder.pathmapper
j.collect_outputs = functools.partial(self.collect_output_ports, self.tool["outputs"], builder)
j.collect_outputs = partial(
self.collect_output_ports, self.tool["outputs"], builder)
j.output_callback = output_callback

yield j
Expand All @@ -246,9 +302,9 @@ def collect_output_ports(self, ports, builder, outdir):
_logger.debug(u"Raw output from %s: %s", custom_output, json.dumps(ret, indent=4))
adjustFileObjs(ret, remove_hostfs)
adjustFileObjs(ret,
cast(Callable[[Any], Any], # known bug in mypy
cast(Callable[[Any], Any], # known bug in mypy
# https://github.com/python/mypy/issues/797
functools.partial(revmap_file, builder, outdir)))
partial(revmap_file, builder, outdir)))
adjustFileObjs(ret, remove_hostfs)
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret)
return ret
Expand All @@ -273,7 +329,7 @@ def collect_output(self, schema, builder, outdir):
binding = schema["outputBinding"]
globpatterns = [] # type: List[str]

revmap = functools.partial(revmap_file, builder, outdir)
revmap = partial(revmap_file, builder, outdir)

if "glob" in binding:
for gb in aslist(binding["glob"]):
Expand Down
17 changes: 14 additions & 3 deletions cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from . import draft2tool
import argparse
from schema_salad.ref_resolver import Loader
import string
import json
import os
import sys
Expand All @@ -21,6 +22,7 @@
from . import update
from .process import shortname, Process
import rdflib
import hashlib
from .utils import aslist
from typing import Union, Any, cast, Callable, Dict, Tuple, IO

Expand Down Expand Up @@ -61,10 +63,14 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
help="Path prefix for temporary directories",
default="tmp")

parser.add_argument("--tmp-outdir-prefix", type=str,
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--tmp-outdir-prefix", type=str,
help="Path prefix for intermediate output directories",
default="tmp")

exgroup.add_argument("--cachedir", type=str, default="",
help="Directory to cache intermediate workflow outputs to avoid recomputing steps.")

exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--rm-tmpdir", action="store_true", default=True,
help="Delete intermediate temporary directories (default)",
Expand Down Expand Up @@ -597,13 +603,17 @@ def main(argsl=None,
if isinstance(job_order_object, int):
return job_order_object

if args.cachedir:
args.cachedir = os.path.abspath(args.cachedir)
args.move_outputs = False

try:
out = executor(t, job_order_object[0],
job_order_object[1], args,
conformance_test=args.conformance_test,
dry_run=args.dry_run,
outdir=args.outdir,
tmp_outdir_prefix=args.tmp_outdir_prefix,
tmp_outdir_prefix=args.cachedir if args.cachedir else args.tmp_outdir_prefix,
use_container=args.use_container,
preserve_environment=args.preserve_environment,
pull_image=args.enable_pull,
Expand All @@ -614,7 +624,8 @@ def main(argsl=None,
makeTool=makeTool,
move_outputs=args.move_outputs,
select_resources=selectResources,
eval_timeout=args.eval_timeout
eval_timeout=args.eval_timeout,
cachedir=args.cachedir
)
# This is the workflow output, it needs to be written
if out is not None:
Expand Down
3 changes: 3 additions & 0 deletions cwltool/pathmapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def mapper(self, src): # type: (str) -> Tuple[str,str]
def files(self): # type: () -> List[str]
return self._pathmap.keys()

def items(self): # type: () -> List[Tuple[str,Tuple[str,str]]]
return self._pathmap.items()

def reversemap(self, target): # type: (str) -> Tuple[str, str]
for k, v in self._pathmap.items():
if v[1] == target:
Expand Down
27 changes: 26 additions & 1 deletion cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
_logger = logging.getLogger("cwltool")

supportedProcessRequirements = ["DockerRequirement",
"ExpressionEngineRequirement",
"SchemaDefRequirement",
"EnvVarRequirement",
"CreateFileRequirement",
Expand Down Expand Up @@ -310,6 +309,32 @@ def _init_job(self, joborder, input_basedir, **kwargs):

builder.bindings.extend(builder.bind_input(self.inputs_record_schema, builder.job))

if self.tool.get("baseCommand"):
for n, b in enumerate(aslist(self.tool["baseCommand"])):
builder.bindings.append({
"position": [-1000000, n],
"valueFrom": b
})

if self.tool.get("arguments"):
for i, a in enumerate(self.tool["arguments"]):
if isinstance(a, dict):
a = copy.copy(a)
if a.get("position"):
a["position"] = [a["position"], i]
else:
a["position"] = [0, i]
a["do_eval"] = a["valueFrom"]
a["valueFrom"] = None
builder.bindings.append(a)
else:
builder.bindings.append({
"position": [0, i],
"valueFrom": a
})

builder.bindings.sort(key=lambda a: a["position"])

builder.resources = self.evalResources(builder, kwargs)

return builder
Expand Down