Skip to content

Added a flag to cache intermediate output #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cwltool/draft2tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .flatten import flatten
import functools
import os
import shutil
from .pathmapper import PathMapper, DockerPathMapper
from .job import CommandLineJob
import yaml
Expand All @@ -20,6 +21,7 @@
import urlparse
import tempfile
from .builder import CONTENT_LIMIT, substitute, Builder
from distutils.dir_util import copy_tree
import shellescape
import errno
from typing import Callable, Any, Union, Generator, cast
Expand Down Expand Up @@ -235,7 +237,7 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]

yield j

def collect_output_ports(self, ports, builder, outdir):
def collect_output_ports(self, ports, builder, outdir, **kwargs):
# type: (Set[Dict[str,Any]], Builder, str) -> Dict[str,Union[str,List[Any],Dict[str,Any]]]
try:
ret = {} # type: Dict[str,Union[str,List[Any],Dict[str,Any]]]
Expand All @@ -262,6 +264,11 @@ def collect_output_ports(self, ports, builder, outdir):
if ret:
adjustFileObjs(ret, remove_hostfs)
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret)

if builder.cacheIntermediateOutput:
cachedir = kwargs.get("cachedir")
copy_tree(outdir, cachedir)

return ret if ret is not None else {}
except validate.ValidationException as e:
raise WorkflowException("Error validating output record, " + str(e) + "\n in " + json.dumps(ret, indent=4))
Expand Down
134 changes: 72 additions & 62 deletions cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import io
import os
import tempfile
import string
import glob
import json
import yaml
Expand All @@ -15,6 +16,8 @@
import stat
import re
import shellescape
import hashlib
from distutils.dir_util import copy_tree
from .docker_uid import docker_vm_uid
from .builder import Builder
from typing import Union, Iterable, Callable, Any, Mapping, IO, cast, Tuple
Expand Down Expand Up @@ -150,73 +153,80 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,
outputs = {} # type: Dict[str,str]

try:
for t in self.generatefiles:
entry = self.generatefiles[t]
if isinstance(entry, dict):
src = entry["path"]
dst = os.path.join(self.outdir, t)
if os.path.dirname(self.pathmapper.reversemap(src)[1]) != self.outdir:
_logger.debug(u"symlinking %s to %s", dst, src)
os.symlink(src, dst)
elif isinstance(entry, str):
with open(os.path.join(self.outdir, t), "w") as fout:
fout.write(entry)
else:
raise Exception("Unhandled type")

if self.stdin:
stdin = open(self.pathmapper.mapper(self.stdin)[0], "rb")
else:
stdin = subprocess.PIPE

if self.stdout:
absout = os.path.join(self.outdir, self.stdout)
dn = os.path.dirname(absout)
if dn and not os.path.exists(dn):
os.makedirs(dn)
stdout = open(absout, "wb")
else:
stdout = sys.stderr

sp = subprocess.Popen([str(x) for x in runtime + self.command_line],
shell=False,
close_fds=True,
stdin=stdin,
stdout=stdout,
env=env,
cwd=self.outdir)

if sp.stdin:
sp.stdin.close()
digest = kwargs.get("generate_identity")(self, **kwargs)
cachedir = os.path.join(kwargs.get("outdir"), "cache", digest)

rcode = sp.wait()

if isinstance(stdin, file):
stdin.close()

if stdout is not sys.stderr:
stdout.close()

if self.successCodes and rcode in self.successCodes:
processStatus = "success"
elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
processStatus = "temporaryFail"
elif self.permanentFailCodes and rcode in self.permanentFailCodes:
processStatus = "permanentFail"
elif rcode == 0:
if kwargs.get("cache_intermediate_output") and os.path.exists(cachedir):
copy_tree(cachedir, self.outdir)
processStatus = "success"
else:
processStatus = "permanentFail"
for t in self.generatefiles:
entry = self.generatefiles[t]
if isinstance(entry, dict):
src = entry["path"]
dst = os.path.join(self.outdir, t)
if os.path.dirname(self.pathmapper.reversemap(src)[1]) != self.outdir:
_logger.debug(u"symlinking %s to %s", dst, src)
os.symlink(src, dst)
elif isinstance(entry, str):
with open(os.path.join(self.outdir, t), "w") as fout:
fout.write(entry)
else:
raise Exception("Unhandled type")

if self.stdin:
stdin = open(self.pathmapper.mapper(self.stdin)[0], "rb")
else:
stdin = subprocess.PIPE

if self.stdout:
absout = os.path.join(self.outdir, self.stdout)
dn = os.path.dirname(absout)
if dn and not os.path.exists(dn):
os.makedirs(dn)
stdout = open(absout, "wb")
else:
stdout = sys.stderr

sp = subprocess.Popen([str(x) for x in runtime + self.command_line],
shell=False,
close_fds=True,
stdin=stdin,
stdout=stdout,
env=env,
cwd=self.outdir)

if sp.stdin:
sp.stdin.close()

rcode = sp.wait()

if isinstance(stdin, file):
stdin.close()

if stdout is not sys.stderr:
stdout.close()

if self.successCodes and rcode in self.successCodes:
processStatus = "success"
elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
processStatus = "temporaryFail"
elif self.permanentFailCodes and rcode in self.permanentFailCodes:
processStatus = "permanentFail"
elif rcode == 0:
processStatus = "success"
else:
processStatus = "permanentFail"

for t in self.generatefiles:
if isinstance(self.generatefiles[t], dict):
src = cast(dict, self.generatefiles[t])["path"]
dst = os.path.join(self.outdir, t)
if os.path.dirname(self.pathmapper.reversemap(src)[1]) != self.outdir:
os.remove(dst)
os.symlink(self.pathmapper.reversemap(src)[1], dst)
for t in self.generatefiles:
if isinstance(self.generatefiles[t], dict):
src = cast(dict, self.generatefiles[t])["path"]
dst = os.path.join(self.outdir, t)
if os.path.dirname(self.pathmapper.reversemap(src)[1]) != self.outdir:
os.remove(dst)
os.symlink(self.pathmapper.reversemap(src)[1], dst)

outputs = self.collect_outputs(self.outdir)
outputs = self.collect_outputs(self.outdir, cachedir=cachedir)

except OSError as e:
if e.errno == 2:
Expand Down
32 changes: 31 additions & 1 deletion cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from . import draft2tool
import argparse
from schema_salad.ref_resolver import Loader
import string
import json
import os
import sys
Expand All @@ -21,6 +22,7 @@
from . import update
from .process import shortname, Process
import rdflib
import hashlib
from .utils import aslist
from typing import Union, Any, cast, Callable, Tuple, IO

Expand Down Expand Up @@ -48,6 +50,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
default=("PATH",),
dest="preserve_environment")

parser.add_argument("--cache-intermediate-output", action="store_true")

exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--rm-container", action="store_true", default=True,
help="Delete Docker container used by jobs after they exit (default)",
Expand Down Expand Up @@ -164,6 +168,31 @@ def output_callback(out, processStatus):
output_callback,
**kwargs)

if not kwargs.get("generate_identity"):
def generate_identity(job, **kwargs):
def strip_tmp(arg, tmp):
if arg[:len(tmp)] == tmp:
return arg[len(tmp):]

def reversemap(arg):
back = job.pathmapper.reversemap(arg)

try:
dirs = job.builder.pathmapper.dirs
except:
dirs = {}
untmp = filter(lambda x: x, map(lambda tmp: strip_tmp(arg, tmp), dirs.values()))

if back:
return os.path.basename(back[1])
elif len(untmp) > 0:
return untmp[0]
else:
return arg
line = [reversemap(arg) for arg in job.command_line]
return hashlib.md5(string.join(line)).hexdigest()
kwargs["generate_identity"] = generate_identity

if kwargs.get("conformance_test"):
job = jobiter.next()
a = {"args": job.command_line}
Expand Down Expand Up @@ -609,7 +638,8 @@ def main(argsl=None,
makeTool=makeTool,
move_outputs=args.move_outputs,
select_resources=selectResources,
eval_timeout=args.eval_timeout
eval_timeout=args.eval_timeout,
cache_intermediate_output=args.cache_intermediate_output
)
# This is the workflow output, it needs to be written
if out is not None:
Expand Down
1 change: 1 addition & 0 deletions cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ def _init_job(self, joborder, input_basedir, **kwargs):
builder.requirements = self.requirements
builder.resources = {}
builder.timeout = kwargs.get("eval_timeout")
builder.cacheIntermediateOutput = kwargs.get("cache_intermediate_output")

dockerReq, _ = self.get_requirement("DockerRequirement")
if dockerReq and kwargs.get("use_container"):
Expand Down
3 changes: 3 additions & 0 deletions tests/cache-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

python ../cwltool/main.py --debug --cache-intermediate-output --tmpdir-prefix=/Users/spanglry/Code/cwltool/tests/ --tmp-outdir-prefix=/Users/spanglry/Code/cwltool/tests/ revsort-workflow.cwl --input revsort-workflow.cwl
37 changes: 37 additions & 0 deletions tests/rev.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Simplest example command line program wrapper for the Unix tool "rev".
#
class: CommandLineTool
cwlVersion: cwl:draft-3
description: "Reverse each line using the `rev` command"

# The "inputs" array defines the structure of the input object that describes
# the inputs to the underlying program. Here, there is one input field
# defined that will be called "input" and will contain a "File" object.
#
# The input binding indicates that the input value should be turned into a
# command line argument. In this example inputBinding is an empty object,
# which indicates that the file name should be added to the command line at
# a default location.
inputs:
- id: input
type: File
inputBinding: {}

# The "outputs" array defines the structure of the output object that
# describes the outputs of the underlying program. Here, there is one
# output field defined that will be called "output", must be a "File" type,
# and after the program executes, the output value will be the file
# output.txt in the designated output directory.
outputs:
- id: output
type: File
outputBinding:
glob: output.txt

# The actual program to execute.
baseCommand: rev

# Specify that the standard output stream must be redirected to a file called
# output.txt in the designated output directory.
stdout: output.txt
66 changes: 66 additions & 0 deletions tests/revsort-workflow.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# This is a two-step workflow which uses "revtool" and "sorttool" defined above.
#
class: Workflow
description: "Reverse the lines in a document, then sort those lines."
cwlVersion: cwl:draft-3

# Requirements & hints specify prerequisites and extensions to the workflow.
# In this example, DockerRequirement specifies a default Docker container
# in which the command line tools will execute.
hints:
- class: DockerRequirement
dockerPull: debian:8

# The inputs array defines the structure of the input object that describes
# the inputs to the workflow.
#
# The "reverse_sort" input parameter demonstrates the "default" field. If the
# field "reverse_sort" is not provided in the input object, the default value will
# be used.
inputs:
- id: input
type: File
description: "The input file to be processed."
- id: reverse_sort
type: boolean
default: true
description: "If true, reverse (decending) sort"

# The "outputs" array defines the structure of the output object that describes
# the outputs of the workflow.
#
# Each output field must be connected to the output of one of the workflow
# steps using the "connect" field. Here, the parameter "#output" of the
# workflow comes from the "#sorted" output of the "sort" step.
outputs:
- id: output
type: File
source: "#sorted/output"
description: "The output with the lines reversed and sorted."

# The "steps" array lists the executable steps that make up the workflow.
# The tool to execute each step is listed in the "run" field.
#
# In the first step, the "inputs" field of the step connects the upstream
# parameter "#input" of the workflow to the input parameter of the tool
# "revtool.cwl#input"
#
# In the second step, the "inputs" field of the step connects the output
# parameter "#reversed" from the first step to the input parameter of the
# tool "sorttool.cwl#input".
steps:
- id: rev
inputs:
- { id: input, source: "#input" }
outputs:
- { id: output }
run: rev.cwl

- id: sorted
inputs:
- { id: input, source: "#rev/output" }
- { id: reverse, source: "#reverse_sort" }
outputs:
- { id: output }
run: sort.cwl
Loading