-
Notifications
You must be signed in to change notification settings - Fork 12
/
intake_scan.py
462 lines (372 loc) · 17.6 KB
/
intake_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
# -*- coding: utf-8 -*-
"""Functions for intake scanning."""
__copyright__ = 'Copyright (c) 2019-2021, Utrecht University'
__license__ = 'GPLv3, see LICENSE'
import itertools
import time
import genquery
import intake
from intake_utils import dataset_parse_id, intake_scan_get_metadata_update
from util import *
def intake_scan_collection(ctx, root, scope, in_dataset, found_datasets):
"""Recursively scan a directory in a Youth Cohort intake.
:param ctx: Combined type of a callback and rei struct
:param root: the directory to scan
:param scope: a scoped kvlist buffer
:param in_dataset: whether this collection is within a dataset collection
:param found_datasets: collection of subscopes that were found in order to report toplevel datasets in the scanning process
:returns: Found datasets
"""
# Loop until pseudocode, experiment type and wave are complete.
# But the found values can be overwritten when deeper levels are found.
# Scan files under root
iter = genquery.row_iterator(
"DATA_NAME, COLL_NAME",
"COLL_NAME = '" + root + "'",
genquery.AS_LIST, ctx
)
for row in iter:
path = row[1] + '/' + row[0]
# Determene lock state for object (no collectoin
locked_state = object_is_locked(ctx, path, False)
if locked_state['locked'] or locked_state['frozen']:
continue
remove_dataset_metadata(ctx, path, False)
scan_mark_scanned(ctx, path, False)
parent_in_dataset = in_dataset
metadata_update = intake_scan_get_metadata_update(ctx, path, False, in_dataset, scope)
if metadata_update["in_dataset"]:
apply_dataset_metadata(ctx, path, metadata_update["new_metadata"], False)
if not parent_in_dataset:
# We found a top-level dataset data object.
found_datasets.append(metadata_update["new_metadata"])
else:
apply_partial_metadata(ctx, metadata_update["new_metadata"], path, False)
avu.set_on_data(ctx, path, "unrecognized", "Experiment type, wave or pseudocode missing from path")
# Scan collections under root
iter = genquery.row_iterator(
"COLL_NAME",
"COLL_PARENT_NAME = '" + root + "'",
genquery.AS_LIST, ctx
)
counter = 0
for row in iter:
path = row[0]
counter = counter + 1
dirname = pathutil.basename(path)
if dirname != '/':
# get locked /frozen status
locked_state = object_is_locked(ctx, path, True)
if locked_state['locked'] or locked_state['frozen']:
continue
remove_dataset_metadata(ctx, path, True)
scan_mark_scanned(ctx, path, True)
parent_in_dataset = in_dataset
metadata_update = intake_scan_get_metadata_update(ctx, path, True, in_dataset, scope)
if metadata_update["in_dataset"]:
apply_dataset_metadata(ctx, path, metadata_update["new_metadata"], True)
if not parent_in_dataset:
# We found a new top-level dataset data object.
found_datasets.append(metadata_update["new_metadata"])
else:
apply_partial_metadata(ctx, metadata_update["new_metadata"], path, True)
found_datasets = intake_scan_collection(ctx,
path,
metadata_update["new_metadata"],
parent_in_dataset or metadata_update["in_dataset"],
found_datasets)
return found_datasets
def object_is_locked(ctx, path, is_collection):
"""Returns whether given object in path (collection or dataobject) is locked or frozen
:param ctx: Combined type of a callback and rei struct
:param path: Path to object or collection
:param is_collection: Whether path contains a collection or data object
:returns: Returns locked state
"""
locked_state = {"locked": False,
"frozen": False}
if is_collection:
iter = genquery.row_iterator(
"META_COLL_ATTR_NAME",
"COLL_NAME = '" + path + "'",
genquery.AS_LIST, ctx
)
for row in iter:
if row[0] in ['to_vault_lock', 'to_vault_freeze']:
locked_state['locked'] = True
if row[0] == 'to_vault_freeze':
locked_state['frozen'] = True
else:
parent_coll = pathutil.dirname(path)
iter = genquery.row_iterator(
"META_DATA_ATTR_NAME",
"COLL_NAME = '" + parent_coll + "' AND DATA_NAME = '" + pathutil.basename(path) + "'",
genquery.AS_LIST, ctx
)
# return locked_state
for row in iter:
if row[0] in ['to_vault_lock', 'to_vault_freeze']:
locked_state['locked'] = True
if row[0] == 'to_vault_freeze':
locked_state['frozen'] = True
return locked_state
def remove_dataset_metadata(ctx, path, is_collection):
"""Remove all intake metadata from dataset.
:param ctx: Combined type of a callback and rei struct
:param path: Path to collection or data object
:param is_collection: Whether is a collection or data object
"""
intake_metadata = ["wave",
"experiment_type",
"pseudocode",
"version",
"dataset_id",
"dataset_toplevel",
"error",
"warning",
"dataset_error",
"dataset_warning",
"unrecognized",
"object_count",
"object_errors",
"object_warnings"]
intake_metadata_set = set(intake_metadata)
# Add the following two lines to remove accumulated metadata during testing.
# "comment"
# "scanned"]
if is_collection:
iter = genquery.row_iterator(
"COLL_ID, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
"COLL_NAME = '" + path + "'",
genquery.AS_LIST, ctx
)
else:
iter = genquery.row_iterator(
"DATA_ID, META_DATA_ATTR_NAME, META_DATA_ATTR_VALUE",
"COLL_NAME = '" + pathutil.dirname(path) + "' AND DATA_NAME = '" + pathutil.basename(path) + "'",
genquery.AS_LIST, ctx
)
for _row in iter:
metadata_name = _row[1]
if metadata_name in intake_metadata_set:
if is_collection:
try:
avu.rmw_from_coll(ctx, path, metadata_name, '%')
except Exception as e:
log.write(ctx, "Warning: unable to remove metadata attr {} from {}".format(metadata_name, path))
log.write(ctx, "Removing metadata failed with exception {}".format(str(e)))
else:
try:
avu.rmw_from_data(ctx, path, metadata_name, '%')
except Exception as e:
log.write(ctx, "Warning: unable to remove metadata attr {} from {}".format(metadata_name, path))
log.write(ctx, "Removing metadata failed with exception {}".format(str(e)))
def scan_mark_scanned(ctx, path, is_collection):
"""Sets the username of the scanner and a timestamp as metadata on the scanned object.
:param ctx: Combined type of a callback and rei struct
:param path: Path on which to add scan indication to
:param is_collection: Is scanned object a collection?
"""
timestamp = int(time.time())
user_and_timestamp = user.name(ctx) + ':' + str(timestamp) # str(datetime.date.today())
if is_collection:
avu.set_on_coll(ctx, path, 'scanned', user_and_timestamp)
else:
avu.set_on_data(ctx, path, 'scanned', user_and_timestamp)
def apply_dataset_metadata(ctx, path, scope, is_collection):
"""Apply dataset metadata to an object in a dataset.
:param ctx: Combined type of a callback and rei struct
:param path: Path to the object
:param scope: A scanner scope containing WEPV values
:param is_collection: Whether the object is a collection
"""
for key in scope:
if scope[key]:
if is_collection:
avu.set_on_coll(ctx, path, key, scope[key])
else:
avu.set_on_data(ctx, path, key, scope[key])
def apply_partial_metadata(ctx, scope, path, is_collection):
"""Apply any available id component metadata to the given object.
To be called only for objects outside datasets. When inside a dataset
(or at a dataset toplevel), use intake_apply_dataset_metadata() instead.
:param ctx: Combined type of a callback and rei struct
:param scope: A scanner scope containing some WEPV values
:param path: Path to the object
:param is_collection: Whether the object is a collection
"""
keys = ['wave', 'experiment_type', 'pseudocode', 'version']
for key in keys:
if key in scope:
if scope[key]:
if is_collection:
avu.set_on_coll(ctx, path, key, scope[key])
else:
avu.set_on_data(ctx, path, key, scope[key])
def dataset_add_error(ctx, top_levels, is_collection_toplevel, text, suppress_duplicate_avu_error=False):
"""Add a dataset error to all given dataset toplevels.
:param ctx: Combined type of a callback and rei struct
:param top_levels: A list of toplevel datasets
:param is_collection_toplevel: Indication of whether it is a collection or object
:param text: Error text
:param suppress_duplicate_avu_error: If an AVU already exists, suppress the irods-error. Allow for this situation
:raises Exception: Raises exception when associating error to collection or data object fails
"""
for tl in top_levels:
if is_collection_toplevel:
try:
avu.associate_to_coll(ctx, tl, "dataset_error", text)
except msi.Error as e:
# iRODS errorcode 809000 (CATALOG_ALREADY_HAS_ITEM_BY_THAT_NAME)
if suppress_duplicate_avu_error and str(e).find("809000") > -1:
log.write(ctx, "Trying to associate dataset_error already present on collection: {}".format(tl))
log.write(ctx, "Suppress error handling for AVU: dataset_error - {}".format(text))
else:
raise Exception(e)
else:
try:
avu.associate_to_data(ctx, tl, "dataset_error", text)
except msi.Error as e:
# iRODS errorcode 809000 (CATALOG_ALREADY_HAS_ITEM_BY_THAT_NAME)
if suppress_duplicate_avu_error and str(e).find("809000") > -1:
log.write(ctx, "Trying to associate dataset_error already present on data object: {}".format(tl))
log.write(ctx, "Suppress error handling for AVU: dataset_error - {}".format(text))
else:
raise Exception(e)
def dataset_get_ids(ctx, coll):
"""Find dataset ids under collection.
:param ctx: Combined type of a callback and rei struct
:param coll: Collection name for which to find dataset-ids
:returns: Returns a set of dataset ids
"""
data_ids = set()
# Get distinct data_ids
main_collection_iterator = genquery.row_iterator(
"META_DATA_ATTR_VALUE",
"COLL_NAME = '" + coll + "' AND META_DATA_ATTR_NAME = 'dataset_id' ",
genquery.AS_LIST, ctx
)
subcollection_iterator = genquery.row_iterator(
"META_DATA_ATTR_VALUE",
"COLL_NAME LIKE '" + coll + "/%' AND META_DATA_ATTR_NAME = 'dataset_id' ",
genquery.AS_LIST, ctx
)
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
if row[0]:
data_ids.add(row[0])
return data_ids
def intake_check_datasets(ctx, root):
"""Run checks on all datasets under root.
:param ctx: Combined type of a callback and rei struct
:param root: The collection to get datasets for
"""
dataset_ids = dataset_get_ids(ctx, root)
for dataset_id in dataset_ids:
intake_check_dataset(ctx, root, dataset_id)
def intake_check_dataset(ctx, root, dataset_id):
"""Run checks on the dataset specified by the given dataset id.
This function adds object counts and error counts to top-level objects within the dataset.
For historical reasons, it also adds a warning count, which is always 0.
:param ctx: Combined type of a callback and rei struct
:param root: Collection name
:param dataset_id: Dataset identifier
"""
tl_info = intake.get_dataset_toplevel_objects(ctx, root, dataset_id)
is_collection = tl_info['is_collection']
tl_objects = tl_info['objects']
# Check validity of wav
waves = ["20w", "30w", "0m", "5m", "10m", "3y", "6y", "9y", "12y", "15y"]
components = dataset_parse_id(dataset_id)
if components['wave'] not in waves:
dataset_add_error(ctx, tl_objects, is_collection, "The wave '" + components['wave'] + "' is not in the list of accepted waves")
# check presence of wave, pseudo-ID and experiment
if '' in [components['wave'], components['experiment_type'], components['pseudocode']]:
# Suppress error handing and continue normal processing should a situation arise where Wepv missing is already present on the dataobject/collection
dataset_add_error(ctx, tl_objects, is_collection, "Wave, experiment type or pseudo-ID missing", True)
for tl in tl_objects:
# Save the aggregated counts of #objects, #warnings, #errors on object level
count = get_aggregated_object_count(ctx, dataset_id, tl)
if is_collection:
avu.set_on_coll(ctx, tl, "object_count", str(count))
else:
avu.set_on_data(ctx, tl, "object_count", str(count))
count = get_aggregated_object_error_count(ctx, tl)
if is_collection:
avu.set_on_coll(ctx, tl, "object_errors", str(count))
else:
avu.set_on_data(ctx, tl, "object_errors", str(count))
count = 0
if is_collection:
avu.set_on_coll(ctx, tl, "object_warnings", str(count))
else:
avu.set_on_data(ctx, tl, "object_warnings", str(count))
def get_rel_paths_objects(ctx, root, dataset_id):
"""Get a list of relative paths to all data objects in a dataset.
:param ctx: Combined type of a callback and rei struct
:param root: Root path of the dataset
:param dataset_id: Dataset identifier
:returns: List of objects of relative object paths (e.g. file1.dat, some-subdir/file2.dat...)
"""
tl_info = intake.get_dataset_toplevel_objects(ctx, root, dataset_id)
is_collection = tl_info['is_collection']
tl_objects = tl_info['objects']
rel_path_objects = []
# get the correct parent_collection
try:
if is_collection:
parent_coll = tl_objects[0]
else:
parent_coll = pathutil.dirname(tl_objects[0])
except Exception:
parent_coll = '/'
main_collection_iterator = genquery.row_iterator(
"DATA_NAME, COLL_NAME",
"COLL_NAME = '" + parent_coll + "' AND META_DATA_ATTR_NAME = 'dataset_id' AND META_DATA_ATTR_VALUE = '" + dataset_id + "' ",
genquery.AS_LIST, ctx
)
subcollection_iterator = genquery.row_iterator(
"DATA_NAME, COLL_NAME",
"COLL_NAME LIKE '" + parent_coll + "/%' AND META_DATA_ATTR_NAME = 'dataset_id' AND META_DATA_ATTR_VALUE = '" + dataset_id + "' ",
genquery.AS_LIST, ctx
)
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
# Add objects including relative paths
rel_path_objects.append(row[1][len(parent_coll):] + '/' + row[0])
return rel_path_objects
def get_aggregated_object_count(ctx, dataset_id, tl_collection):
"""Return total amounts of objects.
:param ctx: Combined type of a callback and rei struct
:param dataset_id: Dataset id
:param tl_collection: Collection name of top level
:returns: Aggregated object count
"""
main_collection_iterator = genquery.row_iterator(
"DATA_ID",
"COLL_NAME = '" + tl_collection + "' AND META_DATA_ATTR_NAME = 'dataset_id' "
"AND META_DATA_ATTR_VALUE = '" + dataset_id + "' ",
genquery.AS_LIST, ctx
)
subcollection_iterator = genquery.row_iterator(
"DATA_ID",
"COLL_NAME like '" + tl_collection + "/%' AND META_DATA_ATTR_NAME = 'dataset_id' "
"AND META_DATA_ATTR_VALUE = '" + dataset_id + "' ",
genquery.AS_LIST, ctx
)
return len(list(main_collection_iterator) + list(subcollection_iterator))
def get_aggregated_object_error_count(ctx, tl_collection):
"""Return total amount of object errors.
:param ctx: Combined type of a callback and rei struct
:param tl_collection: Collection name of top level
:returns: Total amount of object errors
"""
main_collection_iterator = genquery.row_iterator(
"DATA_ID",
"COLL_NAME = '" + tl_collection + "' AND META_DATA_ATTR_NAME = 'error' ",
genquery.AS_LIST, ctx
)
subcollection_iterator = genquery.row_iterator(
"DATA_ID",
"COLL_NAME like '" + tl_collection + "/%' AND META_DATA_ATTR_NAME = 'error' ",
genquery.AS_LIST, ctx
)
return len(list(main_collection_iterator) + list(subcollection_iterator))