Skip to content

Eliminate dirent #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions cwltool/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Union, AnyStr, Callable
from .errors import WorkflowException
from .stdfsaccess import StdFsAccess
from .pathmapper import PathMapper, adjustFileObjs, adjustDirObjs
from .pathmapper import PathMapper, adjustFileObjs, adjustDirObjs, normalizeFilesDirs

CONTENT_LIMIT = 64 * 1024

Expand All @@ -31,7 +31,8 @@ def __init__(self): # type: () -> None
self.resources = None # type: Dict[str, Union[int, str]]
self.bindings = [] # type: List[Dict[str, Any]]
self.timeout = None # type: int
self.pathmapper = None # type: PathMapper
self.pathmapper = None # type: PathMapper
self.stagedir = None # type: unicode

def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]):
# type: (Dict[unicode, Any], Any, List[int], List[int]) -> List[Dict[str, Any]]
Expand Down Expand Up @@ -112,7 +113,8 @@ def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]):
if isinstance(sf, dict) or "$(" in sf or "${" in sf:
secondary_eval = self.do_eval(sf, context=datum)
if isinstance(secondary_eval, basestring):
sfpath = {"location": secondary_eval, "class": "File"}
sfpath = {"location": secondary_eval,
"class": "File"}
else:
sfpath = secondary_eval
else:
Expand All @@ -121,6 +123,7 @@ def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]):
datum["secondaryFiles"].extend(sfpath)
else:
datum["secondaryFiles"].append(sfpath)
normalizeFilesDirs(datum["secondaryFiles"])

def _capture_files(f):
self.files.append(f)
Expand Down Expand Up @@ -187,8 +190,14 @@ def generate_arg(self, binding): # type: (Dict[str,Any]) -> List[str]

return [a for a in args if a is not None]

def do_eval(self, ex, context=None, pull_image=True):
# type: (Dict[str,str], Any, bool) -> Any
def do_eval(self, ex, context=None, pull_image=True, recursive=False):
# type: (Union[Dict[str, str], unicode], Any, bool, bool) -> Any
if recursive:
if isinstance(ex, dict):
return {k: self.do_eval(v, context, pull_image, recursive) for k,v in ex.iteritems()}
if isinstance(ex, list):
return [self.do_eval(v, context, pull_image, recursive) for v in ex]

return expression.do_eval(ex, self.job, self.requirements,
self.outdir, self.tmpdir,
self.resources,
Expand Down
4 changes: 2 additions & 2 deletions cwltool/cwltest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ def compare(a, b): # type: (Any, Any) -> bool
raise CompareFail(u"%s not in %s" % (json.dumps(i, indent=4, sort_keys=True), json.dumps(b, indent=4, sort_keys=True)))

a = {k: v for k, v in a.iteritems()
if k not in ("path", "location", "listing")}
if k not in ("path", "location", "listing", "basename")}
b = {k: v for k, v in b.iteritems()
if k not in ("path", "location", "listing")}
if k not in ("path", "location", "listing", "basename")}

if len(a) != len(b):
raise CompareFail(u"expected %s\ngot %s" % (json.dumps(a, indent=4, sort_keys=True), json.dumps(b, indent=4, sort_keys=True)))
Expand Down
86 changes: 61 additions & 25 deletions cwltool/draft2tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import glob
import logging
import hashlib
import random
import re
import urlparse
import tempfile
Expand All @@ -17,7 +16,7 @@
import shellescape
from typing import Callable, Any, Union, Generator, cast

from .process import Process, shortname, uniquename, getListing
from .process import Process, shortname, uniquename, getListing, normalizeFilesDirs
from .errors import WorkflowException
from .utils import aslist
from . import expression
Expand Down Expand Up @@ -50,7 +49,9 @@ def __init__(self): # type: () -> None

def run(self, **kwargs): # type: (**Any) -> None
try:
self.output_callback(self.builder.do_eval(self.script), "success")
ev = self.builder.do_eval(self.script)
normalizeFilesDirs(ev)
self.output_callback(ev, "success")
except Exception as e:
_logger.warn(u"Failed to evaluate expression:\n%s", e, exc_info=(e if kwargs.get('debug') else False))
self.output_callback({}, "permanentFail")
Expand Down Expand Up @@ -110,6 +111,17 @@ def run(self, **kwargs):
self.cachebuilder, self.outdir),
"success")

# map files to assigned path inside a container. We need to also explicitly
# walk over input as implicit reassignment doesn't reach everything in builder.bindings
def check_adjust(builder, f):
# type: (Builder, Dict[str, Any]) -> Dict[str,Any]
f["path"] = builder.pathmapper.mapper(f["location"])[1]
f["dirname"], f["basename"] = os.path.split(f["path"])
if f["class"] == "File":
f["nameroot"], f["nameext"] = os.path.splitext(f["basename"])
if not ACCEPTLIST_RE.match(f["basename"]):
raise WorkflowException("Invalid filename: '%s' contains illegal characters" % (f["basename"]))
return f

class CommandLineTool(Process):
def __init__(self, toolpath_object, **kwargs):
Expand All @@ -120,7 +132,7 @@ def makeJobRunner(self): # type: () -> CommandLineJob
return CommandLineJob()

def makePathMapper(self, reffiles, stagedir, **kwargs):
# type: (Set[Any], unicode, **Any) -> PathMapper
# type: (List[Any], unicode, **Any) -> PathMapper
dockerReq, _ = self.get_requirement("DockerRequirement")
try:
return PathMapper(reffiles, kwargs["basedir"], stagedir)
Expand All @@ -141,8 +153,13 @@ def job(self, joborder, output_callback, **kwargs):
cachebuilder = self._init_job(joborder, **cacheargs)
cachebuilder.pathmapper = PathMapper(cachebuilder.files,
kwargs["basedir"],
cachebuilder.stagedir)

cachebuilder.stagedir,
separateDirs=False)
_check_adjust = partial(check_adjust, cachebuilder)
adjustFileObjs(cachebuilder.files, _check_adjust)
adjustFileObjs(cachebuilder.bindings, _check_adjust)
adjustDirObjs(cachebuilder.files, _check_adjust)
adjustDirObjs(cachebuilder.bindings, _check_adjust)
cmdline = flatten(map(cachebuilder.generate_arg, cachebuilder.bindings))
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
Expand All @@ -151,8 +168,9 @@ def job(self, joborder, output_callback, **kwargs):
keydict = {u"cmdline": cmdline}

for _,f in cachebuilder.pathmapper.items():
st = os.stat(f[0])
keydict[f[0]] = [st.st_size, int(st.st_mtime * 1000)]
if f.type == "File":
st = os.stat(f.resolved)
keydict[f.resolved] = [st.st_size, int(st.st_mtime * 1000)]

interesting = {"DockerRequirement",
"EnvVarRequirement",
Expand Down Expand Up @@ -236,19 +254,10 @@ def rm_pending_output_callback(output_callback, jobcachepending,
builder.pathmapper = self.makePathMapper(reffiles, builder.stagedir, **kwargs)
builder.requirements = j.requirements

# map files to assigned path inside a container. We need to also explicitly
# walk over input as implicit reassignment doesn't reach everything in builder.bindings
def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
f["path"] = builder.pathmapper.mapper(f["location"])[1]
f["dirname"], f["basename"] = os.path.split(f["path"])
if f["class"] == "File":
f["nameroot"], f["nameext"] = os.path.splitext(f["basename"])
if not ACCEPTLIST_RE.match(f["basename"]):
raise WorkflowException("Invalid filename: '%s' contains illegal characters" % (f["basename"]))
return f

_logger.debug(u"[job %s] path mappings is %s", j.name, json.dumps({p: builder.pathmapper.mapper(p) for p in builder.pathmapper.files()}, indent=4))

_check_adjust = partial(check_adjust, builder)

adjustFileObjs(builder.files, _check_adjust)
adjustFileObjs(builder.bindings, _check_adjust)
adjustDirObjs(builder.files, _check_adjust)
Expand All @@ -260,27 +269,51 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]

_logger.debug(u"[job %s] command line bindings is %s", j.name, json.dumps(builder.bindings, indent=4))

dockerReq, _ = self.get_requirement("DockerRequirement")
dockerReq = self.get_requirement("DockerRequirement")[0]
if dockerReq and kwargs.get("use_container"):
out_prefix = kwargs.get("tmp_outdir_prefix")
j.outdir = kwargs.get("outdir") or tempfile.mkdtemp(prefix=out_prefix)
tmpdir_prefix = kwargs.get('tmpdir_prefix')
j.tmpdir = kwargs.get("tmpdir") or tempfile.mkdtemp(prefix=tmpdir_prefix)
j.stagedir = None
j.stagedir = tempfile.mkdtemp(prefix=tmpdir_prefix)
else:
j.outdir = builder.outdir
j.tmpdir = builder.tmpdir
j.stagedir = builder.stagedir

initialWorkdir = self.get_requirement("InitialWorkDirRequirement")[0]
j.generatefiles = {"class": "Directory", "listing": []}
j.generatefiles = {"class": "Directory", "listing": [], "basename": ""}
if initialWorkdir:
ls = [] # type: List[Dict[str, Any]]
if isinstance(initialWorkdir["listing"], (str, unicode)):
j.generatefiles["listing"] = builder.do_eval(initialWorkdir["listing"])
ls = builder.do_eval(initialWorkdir["listing"])
else:
for t in initialWorkdir["listing"]:
j.generatefiles["listing"].append({"entryname": builder.do_eval(t["entryname"]),
"entry": copy.deepcopy(builder.do_eval(t["entry"]))})
if "entry" in t:
et = {"entry": builder.do_eval(t["entry"])}
if "entryname" in t:
et["entryname"] = builder.do_eval(t["entryname"])
else:
et["entryname"] = None
ls.append(et)
else:
ls.append(t)
for i,t in enumerate(ls):
if "entry" in t:
if isinstance(t["entry"], (str, unicode)):
ls[i] = {
"class": "File",
"basename": t["entryname"],
"contents": t["entry"]
}
else:
if t["entryname"]:
t = copy.deepcopy(t)
t["entry"]["basename"] = t["entryname"]
ls[i] = t["entry"]
j.generatefiles[u"listing"] = ls

normalizeFilesDirs(j.generatefiles)

j.environment = {}
evr = self.get_requirement("EnvVarRequirement")[0]
Expand Down Expand Up @@ -321,6 +354,8 @@ def collect_output_ports(self, ports, builder, outdir):
# https://github.com/python/mypy/issues/797
partial(revmap_file, builder, outdir)))
adjustFileObjs(ret, remove_path)
adjustDirObjs(ret, remove_path)
normalizeFilesDirs(ret)
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret)
return ret

Expand All @@ -334,6 +369,7 @@ def collect_output_ports(self, ports, builder, outdir):
if ret:
adjustFileObjs(ret, remove_path)
adjustDirObjs(ret, remove_path)
normalizeFilesDirs(ret)
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret)
return ret if ret is not None else {}
except validate.ValidationException as e:
Expand Down
18 changes: 13 additions & 5 deletions cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ def __init__(self): # type: () -> None
self.outdir = None # type: str
self.tmpdir = None # type: str
self.environment = None # type: Dict[str,str]
self.generatefiles = None # type: Dict[str,Union[Dict[str,str],str]]
self.generatefiles = None # type: Dict[unicode, Union[List[Dict[str, str]], Dict[str,str], str]]
self.stagedir = None # type: unicode

def run(self, dry_run=False, pull_image=True, rm_container=True,
rm_tmpdir=True, move_outputs="move", **kwargs):
# type: (bool, bool, bool, bool, bool, **Any) -> Union[Tuple[str,Dict[None,None]],None]
# type: (bool, bool, bool, bool, bool, unicode, **Any) -> Union[Tuple[str,Dict[None,None]],None]
if not os.path.exists(self.outdir):
os.makedirs(self.outdir)

Expand All @@ -78,10 +79,12 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,

(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")

for f in self.pathmapper.files():
p = self.pathmapper.mapper(f)
for knownfile in self.pathmapper.files():
p = self.pathmapper.mapper(knownfile)
if p.type == "File" and not os.path.isfile(p[0]):
raise WorkflowException(u"Input file %s (at %s) not found or is not a regular file." % (f, self.pathmapper.mapper(f)[0]))
raise WorkflowException(
u"Input file %s (at %s) not found or is not a regular file."
% (knownfile, self.pathmapper.mapper(knownfile)[0]))

img_id = None
if docker_req and kwargs.get("use_container") is not False:
Expand All @@ -97,6 +100,11 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,
vol = self.pathmapper.mapper(src)
if vol.type == "File":
runtime.append(u"--volume=%s:%s:ro" % (vol.resolved, vol.target))
if vol.type == "CreateFile":
createtmp = os.path.join(self.stagedir, os.path.basename(vol.target))
with open(createtmp, "w") as f:
f.write(vol.resolved.encode("utf-8"))
runtime.append(u"--volume=%s:%s:ro" % (createtmp, vol.target))
runtime.append(u"--volume=%s:%s:rw" % (os.path.abspath(self.outdir), "/var/spool/cwl"))
runtime.append(u"--volume=%s:%s:rw" % (os.path.abspath(self.tmpdir), "/tmp"))
runtime.append(u"--workdir=%s" % ("/var/spool/cwl"))
Expand Down
13 changes: 6 additions & 7 deletions cwltool/load_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import urlparse
from schema_salad.ref_resolver import Loader
import schema_salad.validate as validate
from schema_salad.validate import ValidationException
import schema_salad.schema as schema
from avro.schema import Names
from . import update
Expand Down Expand Up @@ -37,13 +38,12 @@ def fetch_document(argsworkflow):
workflowobj = argsworkflow
uri = "#" + str(id(argsworkflow))
else:
raise validate.ValidationException(
"Must be URI or object: '%s'" % argsworkflow)
raise ValidationException("Must be URI or object: '%s'" % argsworkflow)

return document_loader, workflowobj, uri

def _convert_stdstreams_to_files(workflowobj):
# type: (Union[Dict[unicode, Any], List[Dict[unicode, Any]]) -> None
# type: (Union[Dict[unicode, Any], List[Dict[unicode, Any]]]) -> None

if isinstance(workflowobj, dict):
if ('class' in workflowobj
Expand All @@ -53,7 +53,7 @@ def _convert_stdstreams_to_files(workflowobj):
for streamtype in ['stdout', 'stderr']:
if out['type'] == streamtype:
if 'outputBinding' in out:
raise validate.ValidateException(
raise ValidationException(
"Not allowed to specify outputBinding when"
" using %s shortcut." % streamtype)
if streamtype in workflowobj:
Expand Down Expand Up @@ -109,12 +109,11 @@ def validate_document(document_loader, workflowobj, uri,
workflowobj["id"] = fileuri
processobj, metadata = document_loader.resolve_all(workflowobj, fileuri)
if not isinstance(processobj, (dict, list)):
raise validate.ValidationException("Workflow must be a dict or list.")
raise ValidationException("Workflow must be a dict or list.")

if not metadata:
if not isinstance(processobj, dict):
raise validate.ValidationException(
"Draft-2 workflows must be a dict.")
raise ValidationException("Draft-2 workflows must be a dict.")
metadata = {"$namespaces": processobj.get("$namespaces", {}),
"$schemas": processobj.get("$schemas", []),
"cwlVersion": processobj["cwlVersion"]}
Expand Down
Loading