Skip to content

reduce loading time for packed documents #589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 73 additions & 57 deletions cwltool/load_tool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Loads a CWL document."""
from __future__ import absolute_import
# pylint: disable=unused-import
"""Loads a CWL document."""

import logging
import os
Expand All @@ -9,34 +9,34 @@
import hashlib
import json
import copy
from typing import Any, Callable, Dict, List, Text, Tuple, Union, cast, Iterable
from typing import (Any, Callable, Dict, Iterable, List, Mapping, Optional,
Text, Tuple, Union, cast)

import requests.sessions
from six import itervalues, string_types
from six.moves import urllib

import schema_salad.schema as schema
from avro.schema import Names
from ruamel.yaml.comments import CommentedMap, CommentedSeq
from schema_salad.ref_resolver import Fetcher, Loader, file_uri
from schema_salad.ref_resolver import ContextType, Fetcher, Loader, file_uri
from schema_salad.sourceline import cmap
from schema_salad.validate import ValidationException
from six.moves import urllib

from . import process, update
from .errors import WorkflowException
from .process import Process, shortname, get_schema
from .update import ALLUPDATES

_logger = logging.getLogger("cwltool")

jobloaderctx = {
u"cwl": "https://w3id.org/cwl/cwl#",
u"cwltool": "http://commonwl.org/cwltool#",
u"path": {u"@type": u"@id"},
u"location": {u"@type": u"@id"},
u"format": {u"@type": u"@id"},
u"id": u"@id"
}
} # type: ContextType


overrides_ctx = {
Expand All @@ -51,26 +51,39 @@
"@id": "cwltool:override",
"mapSubject": "class"
}
} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]]
} # type: ContextType


FetcherConstructorType = Callable[[Dict[Text, Union[Text, bool]],
requests.sessions.Session], Fetcher]

loaders = {} # type: Dict[FetcherConstructorType, Loader]

def default_loader(fetcher_constructor):
# type: (Optional[FetcherConstructorType]) -> Loader
if fetcher_constructor in loaders:
return loaders[fetcher_constructor]
else:
loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor)
loaders[fetcher_constructor] = loader
return loader

def resolve_tool_uri(argsworkflow, # type: Text
resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
fetcher_constructor=None,
# type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
fetcher_constructor=None, # type: FetcherConstructorType
document_loader=None # type: Loader
):
# type: (...) -> Tuple[Text, Text]
): # type: (...) -> Tuple[Text, Text]

uri = None # type: Text
split = urllib.parse.urlsplit(argsworkflow)
# In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
if split.scheme and split.scheme in [u'http',u'https',u'file']:
if split.scheme and split.scheme in [u'http', u'https', u'file']:
uri = argsworkflow
elif os.path.exists(os.path.abspath(argsworkflow)):
uri = file_uri(str(os.path.abspath(argsworkflow)))
elif resolver:
if document_loader is None:
document_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore
document_loader = default_loader(fetcher_constructor) # type: ignore
uri = resolver(document_loader, argsworkflow)

if uri is None:
Expand All @@ -85,18 +98,17 @@ def resolve_tool_uri(argsworkflow, # type: Text

def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]]
resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
fetcher_constructor=None
# type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
):
# type: (...) -> Tuple[Loader, CommentedMap, Text]
fetcher_constructor=None # type: FetcherConstructorType
): # type: (...) -> Tuple[Loader, CommentedMap, Text]
"""Retrieve a CWL document."""

document_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore
document_loader = default_loader(fetcher_constructor) # type: ignore

uri = None # type: Text
workflowobj = None # type: CommentedMap
if isinstance(argsworkflow, string_types):
uri, fileuri = resolve_tool_uri(argsworkflow, resolver=resolver, document_loader=document_loader)
uri, fileuri = resolve_tool_uri(argsworkflow, resolver=resolver,
document_loader=document_loader)
workflowobj = document_loader.fetch(fileuri)
elif isinstance(argsworkflow, dict):
uri = "#" + Text(id(argsworkflow))
Expand Down Expand Up @@ -126,7 +138,7 @@ def _convert_stdstreams_to_files(workflowobj):
sort_keys=True).encode('utf-8')).hexdigest())
workflowobj[streamtype] = filename
out['type'] = 'File'
out['outputBinding'] = {'glob': filename}
out['outputBinding'] = cmap({'glob': filename})
for inp in workflowobj.get('inputs', []):
if inp.get('type') == 'stdin':
if 'inputBinding' in inp:
Expand Down Expand Up @@ -170,25 +182,25 @@ def validate_document(document_loader, # type: Loader
enable_dev=False, # type: bool
strict=True, # type: bool
preprocess_only=False, # type: bool
fetcher_constructor=None,
skip_schemas=None,
# type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
overrides=None # type: List[Dict]
fetcher_constructor=None, # type: FetcherConstructorType
skip_schemas=None, # type: bool
overrides=None, # type: List[Dict]
metadata=None, # type: Optional[Dict]
):
# type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
"""Validate a CWL document."""

if isinstance(workflowobj, list):
workflowobj = {
workflowobj = cmap({
"$graph": workflowobj
}
}, fn=uri)

if not isinstance(workflowobj, dict):
raise ValueError("workflowjobj must be a dict, got '%s': %s" % (type(workflowobj), workflowobj))

jobobj = None
if "cwl:tool" in workflowobj:
job_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore
job_loader = default_loader(fetcher_constructor) # type: ignore
jobobj, _ = job_loader.resolve_all(workflowobj, uri)
uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"])
del cast(dict, jobobj)["https://w3id.org/cwl/cwl#tool"]
Expand All @@ -200,22 +212,25 @@ def validate_document(document_loader, # type: Loader
workflowobj = fetch_document(uri, fetcher_constructor=fetcher_constructor)[1]

fileuri = urllib.parse.urldefrag(uri)[0]

if "cwlVersion" in workflowobj:
if not isinstance(workflowobj["cwlVersion"], (str, Text)):
raise Exception("'cwlVersion' must be a string, got %s" % type(workflowobj["cwlVersion"]))
# strip out version
workflowobj["cwlVersion"] = re.sub(
r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "",
workflowobj["cwlVersion"])
if workflowobj["cwlVersion"] not in list(ALLUPDATES):
# print out all the Supported Versions of cwlVersion
versions = list(ALLUPDATES) # ALLUPDATES is a dict
versions.sort()
raise ValidationException("'cwlVersion' not valid. Supported CWL versions are: \n{}".format("\n".join(versions)))
else:
raise ValidationException("No cwlVersion found."
"Use the following syntax in your CWL workflow to declare version: cwlVersion: <version>")
if "cwlVersion" not in workflowobj:
if metadata and 'cwlVersion' in metadata:
workflowobj['cwlVersion'] = metadata['cwlVersion']
else:
raise ValidationException("No cwlVersion found."
"Use the following syntax in your CWL document to declare "
"the version: cwlVersion: <version>")

if not isinstance(workflowobj["cwlVersion"], (str, Text)):
raise Exception("'cwlVersion' must be a string, got %s" % type(workflowobj["cwlVersion"]))
# strip out version
workflowobj["cwlVersion"] = re.sub(
r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "",
workflowobj["cwlVersion"])
if workflowobj["cwlVersion"] not in list(ALLUPDATES):
# print out all the Supported Versions of cwlVersion
versions = list(ALLUPDATES) # ALLUPDATES is a dict
versions.sort()
raise ValidationException("'cwlVersion' not valid. Supported CWL versions are: \n{}".format("\n".join(versions)))

if workflowobj["cwlVersion"] == "draft-2":
workflowobj = cast(CommentedMap, cmap(update._draft2toDraft3dev1(
Expand All @@ -238,36 +253,36 @@ def validate_document(document_loader, # type: Loader
_add_blank_ids(workflowobj)

workflowobj["id"] = fileuri
processobj, metadata = document_loader.resolve_all(workflowobj, fileuri)
processobj, new_metadata = document_loader.resolve_all(workflowobj, fileuri)
if not isinstance(processobj, (CommentedMap, CommentedSeq)):
raise ValidationException("Workflow must be a dict or list.")

if not metadata:
if not new_metadata:
if not isinstance(processobj, dict):
raise ValidationException("Draft-2 workflows must be a dict.")
metadata = cast(CommentedMap, cmap({"$namespaces": processobj.get("$namespaces", {}),
"$schemas": processobj.get("$schemas", []),
"cwlVersion": processobj["cwlVersion"]},
fn=fileuri))
new_metadata = cast(CommentedMap, cmap(
{"$namespaces": processobj.get("$namespaces", {}),
"$schemas": processobj.get("$schemas", []),
"cwlVersion": processobj["cwlVersion"]}, fn=fileuri))

_convert_stdstreams_to_files(workflowobj)

if preprocess_only:
return document_loader, avsc_names, processobj, metadata, uri
return document_loader, avsc_names, processobj, new_metadata, uri

schema.validate_doc(avsc_names, processobj, document_loader, strict)

if metadata.get("cwlVersion") != update.LATEST:
if new_metadata.get("cwlVersion") != update.LATEST:
processobj = cast(CommentedMap, cmap(update.update(
processobj, document_loader, fileuri, enable_dev, metadata)))
processobj, document_loader, fileuri, enable_dev, new_metadata)))

if jobobj:
metadata[u"cwl:defaults"] = jobobj
new_metadata[u"cwl:defaults"] = jobobj

if overrides:
metadata[u"cwltool:overrides"] = overrides
new_metadata[u"cwltool:overrides"] = overrides

return document_loader, avsc_names, processobj, metadata, uri
return document_loader, avsc_names, processobj, new_metadata, uri


def make_tool(document_loader, # type: Loader
Expand Down Expand Up @@ -322,7 +337,7 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
enable_dev=False, # type: bool
strict=True, # type: bool
resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
fetcher_constructor=None, # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
fetcher_constructor=None, # type: FetcherConstructorType
overrides=None
):
# type: (...) -> Process
Expand All @@ -332,7 +347,8 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
document_loader, avsc_names, processobj, metadata, uri = validate_document(
document_loader, workflowobj, uri, enable_dev=enable_dev,
strict=strict, fetcher_constructor=fetcher_constructor,
overrides=overrides)
overrides=overrides, metadata=kwargs.get('metadata', None)
if kwargs else None)
return make_tool(document_loader, avsc_names, metadata, uri,
makeTool, kwargs if kwargs else {})

Expand Down
12 changes: 7 additions & 5 deletions cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
from .builder import Builder
from .cwlrdf import printdot, printrdf
from .errors import UnsupportedRequirement, WorkflowException
from .load_tool import (resolve_tool_uri, fetch_document, make_tool, validate_document,
jobloaderctx, resolve_overrides, load_overrides)
from .load_tool import (FetcherConstructorType, resolve_tool_uri,
fetch_document, make_tool, validate_document, jobloaderctx,
resolve_overrides, load_overrides)
from .mutation import MutationManager
from .pack import pack
from .pathmapper import (adjustDirObjs, adjustFileObjs, get_listing,
Expand Down Expand Up @@ -544,8 +545,8 @@ def load_job_order(args, # type: argparse.Namespace
job_order_object, _ = loader.resolve_ref(job_order_file, checklinks=False)

if job_order_object and "http://commonwl.org/cwltool#overrides" in job_order_object:
overrides.extend(resolve_overrides(job_order_object, file_uri(job_order_file), tool_file_uri))
del job_order_object["http://commonwl.org/cwltool#overrides"]
overrides.extend(resolve_overrides(job_order_object, file_uri(job_order_file), tool_file_uri))
del job_order_object["http://commonwl.org/cwltool#overrides"]

if not job_order_object:
input_basedir = args.basedir if args.basedir else os.getcwd()
Expand Down Expand Up @@ -641,6 +642,7 @@ def addSizes(p):
ns = {} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]]
ns.update(t.metadata.get("$namespaces", {}))
ld = Loader(ns)

def expand_formats(p):
if "format" in p:
p["format"] = ld.expand_url(p["format"], "")
Expand Down Expand Up @@ -734,7 +736,7 @@ def main(argsl=None, # type: List[str]
versionfunc=versionstring, # type: Callable[[], Text]
job_order_object=None, # type: MutableMapping[Text, Any]
make_fs_access=StdFsAccess, # type: Callable[[Text], StdFsAccess]
fetcher_constructor=None, # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
fetcher_constructor=None, # type: FetcherConstructorType
resolver=tool_resolver,
logger_handler=None,
custom_schema_callback=None # type: Callable[[], None]
Expand Down
12 changes: 8 additions & 4 deletions cwltool/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,15 @@ def match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom):
elif isinstance(src.parameter["type"], list):
# Source is union type
# Check that at least one source type is compatible with the sink.
for st in src.parameter["type"]:
srccopy = copy.deepcopy(src)
srccopy.parameter["type"] = st
if match_types(sinktype, srccopy, iid, inputobj, linkMerge, valueFrom):
original_types = src.parameter["type"]
for source_type in original_types:
src.parameter["type"] = source_type
match = match_types(
sinktype, src, iid, inputobj, linkMerge, valueFrom)
if match:
src.parameter["type"] = original_types
return True
src.parameter["type"] = original_types
return False
elif linkMerge:
if iid not in inputobj:
Expand Down
6 changes: 5 additions & 1 deletion tests/test_pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import cwltool.pack
import cwltool.workflow
from cwltool.resolver import tool_resolver
from cwltool import load_tool
from cwltool.load_tool import fetch_document, validate_document
from cwltool.main import makeRelative, main, print_pack
from cwltool.pathmapper import adjustDirObjs, adjustFileObjs
Expand All @@ -23,6 +25,7 @@ class TestPack(unittest.TestCase):
maxDiff = None

def test_pack(self):
load_tool.loaders = {}

document_loader, workflowobj, uri = fetch_document(
get_data("tests/wf/revsort.cwl"))
Expand Down Expand Up @@ -97,10 +100,11 @@ def _pack_idempotently(self, document):
reason="Instance of cwltool is used, on Windows it invokes a default docker container"
"which is not supported on AppVeyor")
def test_packed_workflow_execution(self):
load_tool.loaders = {}
test_wf = "tests/wf/count-lines1-wf.cwl"
test_wf_job = "tests/wf/wc-job.json"
document_loader, workflowobj, uri = fetch_document(
get_data(test_wf))
get_data(test_wf), resolver=tool_resolver)
document_loader, avsc_names, processobj, metadata, uri = validate_document(
document_loader, workflowobj, uri)
packed = json.loads(print_pack(document_loader, processobj, uri, metadata))
Expand Down