2
2
import copy
3
3
import os
4
4
import logging
5
- from typing import Any , Callable , Dict , List , Text , Type , Union
5
+ import json
6
+ from typing import Any , Callable , Dict , List , Text , Type , Union , Set
6
7
7
8
import six
8
9
from six import iteritems , string_types
11
12
import schema_salad .validate as validate
12
13
from schema_salad .sourceline import SourceLine
13
14
15
+ from rdflib import Graph , URIRef
16
+ from rdflib .namespace import OWL , RDFS
17
+
14
18
from . import expression
15
19
from .errors import WorkflowException
16
20
from .mutation import MutationManager
@@ -32,6 +36,52 @@ def substitute(value, replace): # type: (Text, Text) -> Text
32
36
else :
33
37
return value + replace
34
38
39
+ def formatSubclassOf (fmt , cls , ontology , visited ):
40
+ # type: (Text, Text, Graph, Set[Text]) -> bool
41
+ """Determine if `fmt` is a subclass of `cls`."""
42
+
43
+ if URIRef (fmt ) == URIRef (cls ):
44
+ return True
45
+
46
+ if ontology is None :
47
+ return False
48
+
49
+ if fmt in visited :
50
+ return False
51
+
52
+ visited .add (fmt )
53
+
54
+ uriRefFmt = URIRef (fmt )
55
+
56
+ for s , p , o in ontology .triples ((uriRefFmt , RDFS .subClassOf , None )):
57
+ # Find parent classes of `fmt` and search upward
58
+ if formatSubclassOf (o , cls , ontology , visited ):
59
+ return True
60
+
61
+ for s , p , o in ontology .triples ((uriRefFmt , OWL .equivalentClass , None )):
62
+ # Find equivalent classes of `fmt` and search horizontally
63
+ if formatSubclassOf (o , cls , ontology , visited ):
64
+ return True
65
+
66
+ for s , p , o in ontology .triples ((None , OWL .equivalentClass , uriRefFmt )):
67
+ # Find equivalent classes of `fmt` and search horizontally
68
+ if formatSubclassOf (s , cls , ontology , visited ):
69
+ return True
70
+
71
+ return False
72
+
73
+ def checkFormat (actualFile , inputFormats , ontology ):
74
+ # type: (Union[Dict[Text, Any], List, Text], Union[List[Text], Text], Graph) -> None
75
+ for af in aslist (actualFile ):
76
+ if not af :
77
+ continue
78
+ if "format" not in af :
79
+ raise validate .ValidationException (u"Missing required 'format' for File %s" % af )
80
+ for inpf in aslist (inputFormats ):
81
+ if af ["format" ] == inpf or formatSubclassOf (af ["format" ], inpf , ontology , set ()):
82
+ return
83
+ raise validate .ValidationException (
84
+ u"Incompatible file format, expected format(s) %s but file object is: %s" % (inputFormats , json .dumps (af , indent = 4 )))
35
85
36
86
class Builder (object ):
37
87
def __init__ (self ): # type: () -> None
@@ -54,6 +104,7 @@ def __init__(self): # type: () -> None
54
104
self .js_console = False # type: bool
55
105
self .mutation_manager = None # type: MutationManager
56
106
self .force_docker_pull = False # type: bool
107
+ self .formatgraph = None # type: Graph
57
108
58
109
# One of "no_listing", "shallow_listing", "deep_listing"
59
110
# Will be default "no_listing" for CWL v1.1
@@ -70,8 +121,8 @@ def build_job_script(self, commands):
70
121
else :
71
122
return None
72
123
73
- def bind_input (self , schema , datum , lead_pos = None , tail_pos = None ):
74
- # type: (Dict[Text, Any], Any, Union[int, List[int]], List[int]) -> List[Dict[Text, Any]]
124
+ def bind_input (self , schema , datum , lead_pos = None , tail_pos = None , discover_secondaryFiles = False ):
125
+ # type: (Dict[Text, Any], Any, Union[int, List[int]], List[int], bool ) -> List[Dict[Text, Any]]
75
126
if tail_pos is None :
76
127
tail_pos = []
77
128
if lead_pos is None :
@@ -105,9 +156,9 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
105
156
schema = copy .deepcopy (schema )
106
157
schema ["type" ] = t
107
158
if not value_from_expression :
108
- return self .bind_input (schema , datum , lead_pos = lead_pos , tail_pos = tail_pos )
159
+ return self .bind_input (schema , datum , lead_pos = lead_pos , tail_pos = tail_pos , discover_secondaryFiles = discover_secondaryFiles )
109
160
else :
110
- self .bind_input (schema , datum , lead_pos = lead_pos , tail_pos = tail_pos )
161
+ self .bind_input (schema , datum , lead_pos = lead_pos , tail_pos = tail_pos , discover_secondaryFiles = discover_secondaryFiles )
111
162
bound_input = True
112
163
if not bound_input :
113
164
raise validate .ValidationException (u"'%s' is not a valid union %s" % (datum , schema ["type" ]))
@@ -119,17 +170,17 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
119
170
if k in schema :
120
171
st [k ] = schema [k ]
121
172
if value_from_expression :
122
- self .bind_input (st , datum , lead_pos = lead_pos , tail_pos = tail_pos )
173
+ self .bind_input (st , datum , lead_pos = lead_pos , tail_pos = tail_pos , discover_secondaryFiles = discover_secondaryFiles )
123
174
else :
124
- bindings .extend (self .bind_input (st , datum , lead_pos = lead_pos , tail_pos = tail_pos ))
175
+ bindings .extend (self .bind_input (st , datum , lead_pos = lead_pos , tail_pos = tail_pos , discover_secondaryFiles = discover_secondaryFiles ))
125
176
else :
126
177
if schema ["type" ] in self .schemaDefs :
127
178
schema = self .schemaDefs [schema ["type" ]]
128
179
129
180
if schema ["type" ] == "record" :
130
181
for f in schema ["fields" ]:
131
182
if f ["name" ] in datum :
132
- bindings .extend (self .bind_input (f , datum [f ["name" ]], lead_pos = lead_pos , tail_pos = f ["name" ]))
183
+ bindings .extend (self .bind_input (f , datum [f ["name" ]], lead_pos = lead_pos , tail_pos = f ["name" ], discover_secondaryFiles = discover_secondaryFiles ))
133
184
else :
134
185
datum [f ["name" ]] = f .get ("default" )
135
186
@@ -147,15 +198,14 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
147
198
if k in schema :
148
199
itemschema [k ] = schema [k ]
149
200
bindings .extend (
150
- self .bind_input (itemschema , item , lead_pos = n , tail_pos = tail_pos ))
201
+ self .bind_input (itemschema , item , lead_pos = n , tail_pos = tail_pos , discover_secondaryFiles = discover_secondaryFiles ))
151
202
binding = None
152
203
153
204
if schema ["type" ] == "File" :
154
205
self .files .append (datum )
155
- if binding :
156
- if binding .get ("loadContents" ):
157
- with self .fs_access .open (datum ["location" ], "rb" ) as f :
158
- datum ["contents" ] = f .read (CONTENT_LIMIT )
206
+ if (binding and binding .get ("loadContents" )) or schema .get ("loadContents" ):
207
+ with self .fs_access .open (datum ["location" ], "rb" ) as f :
208
+ datum ["contents" ] = f .read (CONTENT_LIMIT )
159
209
160
210
if "secondaryFiles" in schema :
161
211
if "secondaryFiles" not in datum :
@@ -175,14 +225,20 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
175
225
if not found :
176
226
if isinstance (sfname , dict ):
177
227
datum ["secondaryFiles" ].append (sfname )
178
- else :
228
+ elif discover_secondaryFiles :
179
229
datum ["secondaryFiles" ].append ({
180
230
"location" : datum ["location" ][0 :datum ["location" ].rindex ("/" )+ 1 ]+ sfname ,
181
231
"basename" : sfname ,
182
232
"class" : "File" })
233
+ else :
234
+ raise WorkflowException ("Missing required secondary file '%s' from file object: %s" % (
235
+ sfname , json .dumps (datum , indent = 4 )))
183
236
184
237
normalizeFilesDirs (datum ["secondaryFiles" ])
185
238
239
+ if "format" in schema :
240
+ checkFormat (datum , self .do_eval (schema ["format" ]), self .formatgraph )
241
+
186
242
def _capture_files (f ):
187
243
self .files .append (f )
188
244
return f
0 commit comments